This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a49c0  Allow to configure different implementations for Pulsar 
functions state store (#12646)
08a49c0 is described below

commit 08a49c06bff4a52d26319a114961aed6cb6c4791
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Nov 6 16:51:49 2021 -0700

    Allow to configure different implementations for Pulsar functions state 
store (#12646)
---
 .../worker/PulsarFunctionLocalRunTest.java         |   6 +-
 .../worker/PulsarFunctionMetadataStoreTest.java    | 122 ++++++++++++++++++
 pulsar-functions/instance/pom.xml                  |   6 +
 .../functions/instance/JavaInstanceRunnable.java   |  15 ++-
 .../instance/state/BKStateStoreProviderImpl.java   |   2 -
 .../state/PulsarMetadataStateStoreImpl.java        | 142 +++++++++++++++++++++
 .../PulsarMetadataStateStoreProviderImpl.java      |  67 ++++++++++
 .../instance/state/StateStoreProvider.java         |   2 +
 .../instance/JavaInstanceRunnableTest.java         |   2 +-
 .../state/PulsarMetadataStateStoreImplTest.java    | 120 +++++++++++++++++
 .../org/apache/pulsar/functions/LocalRunner.java   |   9 +-
 .../functions/runtime/JavaInstanceStarter.java     |   4 +
 .../functions/runtime/thread/ThreadRuntime.java    |   5 +-
 .../runtime/thread/ThreadRuntimeFactory.java       |  18 ++-
 .../pulsar/functions/worker/WorkerConfig.java      |   8 ++
 .../worker/rest/api/FunctionsImplTest.java         |   4 +-
 16 files changed, 515 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index ed6c4aa..13ac623 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -304,7 +304,7 @@ public class PulsarFunctionLocalRunTest {
         }
     }
 
-    private WorkerConfig createWorkerConfig(ServiceConfiguration config) {
+    protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
 
         System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
                 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
@@ -560,7 +560,7 @@ public class PulsarFunctionLocalRunTest {
         }
     }
 
-    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws 
Exception {
+    protected void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws 
Exception {
         testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
     }
 
@@ -1133,7 +1133,7 @@ public class PulsarFunctionLocalRunTest {
         }
     }
 
-    private void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable 
throwingRunnable) throws Throwable {
+    protected void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable 
throwingRunnable) throws Throwable {
         ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(pulsarApiExamplesClassLoader);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
new file mode 100644
index 0000000..715837f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.api.Record;
+import 
org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Pulsar sink on function
+ */
+@Slf4j
+@Test
+public class PulsarFunctionMetadataStoreTest extends 
PulsarFunctionLocalRunTest {
+
+
+    protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
+        WorkerConfig wc = super.createWorkerConfig(config);
+        
wc.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName());
+        wc.setStateStorageServiceUrl("memory://local");
+        return wc;
+    }
+
+    @Test
+    public void testE2EPulsarFunctionLocalRun() throws Throwable {
+        runWithPulsarFunctionsClassLoader(() -> 
testE2EPulsarFunctionLocalRun(null));
+    }
+}
diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index 6c2ae1b..ece9af7 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -55,6 +55,12 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-metadata</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cfdcb08..50994e5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -103,6 +103,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private LogAppender logAppender;
 
     // provide tables for storing states
+    private final String stateStorageImplClass;
     private final String stateStorageServiceUrl;
     private StateStoreProvider stateStoreProvider;
     private StateManager stateManager;
@@ -144,6 +145,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                                 ClientBuilder clientBuilder,
                                 PulsarClient pulsarClient,
                                 PulsarAdmin pulsarAdmin,
+                                String stateStorageImplClass,
                                 String stateStorageServiceUrl,
                                 SecretsProvider secretsProvider,
                                 FunctionCollectorRegistry collectorRegistry,
@@ -152,6 +154,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         this.clientBuilder = clientBuilder;
         this.client = (PulsarClientImpl) pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
+        this.stateStorageImplClass = stateStorageImplClass;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
         this.functionClassLoader = functionClassLoader;
@@ -322,8 +325,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         if (null == stateStorageServiceUrl) {
             stateStoreProvider = StateStoreProvider.NULL;
         } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
+            stateStoreProvider = getStateStoreProvider();
+            Map<String, Object> stateStoreProviderConfig = new HashMap<>();
             
stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL,
 stateStorageServiceUrl);
             stateStoreProvider.init(stateStoreProviderConfig, 
instanceConfig.getFunctionDetails());
 
@@ -339,6 +342,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         }
     }
 
+    private StateStoreProvider getStateStoreProvider() throws Exception {
+        if (stateStorageImplClass == null) {
+            return new BKStateStoreProviderImpl();
+        } else {
+            return (StateStoreProvider) 
Class.forName(stateStorageImplClass).getConstructor().newInstance();
+        }
+    }
+
     private void handleResult(Record srcRecord, JavaExecutionResult result) 
throws Exception {
         if (result.getUserException() != null) {
             Exception t = result.getUserException();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
index fd4228a..32901bd 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
@@ -56,8 +56,6 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
 @Slf4j
 public class BKStateStoreProviderImpl implements StateStoreProvider {
 
-    public static final String STATE_STORAGE_SERVICE_URL = 
"stateStorageServiceUrl";
-
     private String stateStorageServiceUrl;
     private Map<String, StorageClient> clients;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
new file mode 100644
index 0000000..1c1df7f
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+public class PulsarMetadataStateStoreImpl implements DefaultStateStore {
+
+    private final MetadataStore store;
+    private final String prefixPath;
+    private final MetadataCache<Long> countersCache;
+
+    private final String namespace;
+    private final String tenant;
+    private final String name;
+    private final String fqsn;
+
+    PulsarMetadataStateStoreImpl(MetadataStore store, String prefix, String 
tenant, String namespace, String name) {
+        this.store = store;
+        this.tenant = tenant;
+        this.namespace = namespace;
+        this.name = name;
+        this.fqsn = tenant + '/' + namespace + '/' + name;
+
+        this.prefixPath = prefix + '/' + fqsn + '/';
+        this.countersCache = store.getMetadataCache(Long.class);
+    }
+
+    @Override
+    public String tenant() {
+        return tenant;
+    }
+
+    @Override
+    public String namespace() {
+        return namespace;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String fqsn() {
+        return fqsn;
+    }
+
+    @Override
+    public void init(StateStoreContext ctx) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void put(String key, ByteBuffer value) {
+        putAsync(key, value).join();
+    }
+
+    @Override
+    public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
+        byte[] bytes = new byte[value.remaining()];
+        value.get(bytes);
+        return store.put(getPath(key), bytes, Optional.empty())
+                .thenApply(__ -> null);
+    }
+
+    @Override
+    public void delete(String key) {
+        deleteAsync(key).join();
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(String key) {
+        return store.delete(getPath(key), Optional.empty());
+    }
+
+    @Override
+    public ByteBuffer get(String key) {
+        return getAsync(key).join();
+    }
+
+    @Override
+    public CompletableFuture<ByteBuffer> getAsync(String key) {
+        return store.get(getPath(key))
+                .thenApply(optRes ->
+                        optRes.map(x -> ByteBuffer.wrap(x.getValue()))
+                                .orElse(null));
+    }
+
+    @Override
+    public void incrCounter(String key, long amount) {
+        incrCounterAsync(key, amount);
+    }
+
+    @Override
+    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+        return countersCache.readModifyUpdateOrCreate(getPath(key), optValue ->
+           optValue.orElse(0L) + amount
+        ).thenApply(__ -> null);
+    }
+
+    @Override
+    public long getCounter(String key) {
+        return getCounterAsync(key).join();
+    }
+
+    @Override
+    public CompletableFuture<Long> getCounterAsync(String key) {
+        return countersCache.get(getPath(key))
+                .thenApply(optValue -> optValue.orElse(0L));
+    }
+
+    private String getPath(String key) {
+        return prefixPath + key;
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
new file mode 100644
index 0000000..819bfd9
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+
+public class PulsarMetadataStateStoreProviderImpl implements 
StateStoreProvider {
+
+    private static final String METADATA_URL = "METADATA_URL";
+    private static final String METADATA_STORE_INSTANCE = 
"METADATA_STORE_INSTANCE";
+
+    private static final String METADATA_PREFIX = "METADATA_PREFIX";
+    private static final String METADATA_DEFAULT_PREFIX = "/state-store";
+
+    private MetadataStore store;
+    private String prefix;
+    private boolean shouldCloseStore;
+
+    @Override
+    public void init(Map<String, Object> config, Function.FunctionDetails 
functionDetails) throws Exception {
+
+        prefix = (String) config.getOrDefault(METADATA_PREFIX, 
METADATA_DEFAULT_PREFIX);
+
+        if (config.containsKey(METADATA_STORE_INSTANCE)) {
+            store = (MetadataStore) config.get(METADATA_STORE_INSTANCE);
+            shouldCloseStore = false;
+        } else {
+            String metadataUrl = (String) config.get(METADATA_URL);
+            store = MetadataStoreFactory.create(metadataUrl, 
MetadataStoreConfig.builder().build());
+            shouldCloseStore = true;
+        }
+    }
+
+    @Override
+    public DefaultStateStore getStateStore(String tenant, String namespace, 
String name) throws Exception {
+        return new PulsarMetadataStateStoreImpl(store, prefix, tenant, 
namespace, name);
+    }
+
+    @SneakyThrows
+    @Override
+    public void close() {
+        if (shouldCloseStore) {
+            store.close();
+        }
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
index db3c6b3..4c01e9d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
@@ -27,6 +27,8 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
  */
 public interface StateStoreProvider extends AutoCloseable {
 
+    String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl";
+
     /**
      * The state store provider returns `null` state stores.
      */
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 59a4f5b..b96e813 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -71,7 +71,7 @@ public class JavaInstanceRunnableTest {
         ClientBuilder clientBuilder = mock(ClientBuilder.class);
         when(clientBuilder.build()).thenReturn(null);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, clientBuilder, null, null, null, null, null, null);
+                config, clientBuilder, null, null, null,null, null, null, 
null);
         return javaInstanceRunnable;
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
new file mode 100644
index 0000000..a6fb3eb
--- /dev/null
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
+import io.kubernetes.client.proto.Meta;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.options.Options;
+import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link BKStateStoreImpl}.
+ */
+public class PulsarMetadataStateStoreImplTest {
+
+    private static final String TENANT = "test-tenant";
+    private static final String NS = "test-ns";
+    private static final String NAME = "test-name";
+    private static final String FQSN = "test-tenant/test-ns/test-name";
+    private static final String PREFIX = "/prefix";
+    private static final String PREFIX_PATH = PREFIX + '/' + FQSN + '/';
+
+    private MetadataStore store;
+    private MetadataCache<Long> countersCache;
+    private DefaultStateStore stateContext;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.store = MetadataStoreFactory.create("memory://local", 
MetadataStoreConfig.builder().build());
+        this.countersCache = store.getMetadataCache(Long.class);
+        this.stateContext = new PulsarMetadataStateStoreImpl(store, "/prefix", 
TENANT, NS, NAME);
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        this.store.close();
+    }
+
+    @Test
+    public void testGetter() {
+        assertEquals(stateContext.tenant(), TENANT);
+        assertEquals(stateContext.namespace(), NS);
+        assertEquals(stateContext.name(), NAME);
+        assertEquals(stateContext.fqsn(), FQSN);
+    }
+
+    @Test
+    public void testIncr() throws Exception {
+        stateContext.incrCounter("test-key", 10L);
+        assertEquals(countersCache.get(PREFIX_PATH + 
"test-key").join().get().longValue(), 10);
+    }
+
+    @Test
+    public void testPut() throws Exception {
+        stateContext.put("test-key", 
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+        assertEquals(store.get(PREFIX_PATH + 
"test-key").join().get().getValue(), "test-value".getBytes(UTF_8));
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        stateContext.put("test-key", 
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+        assertEquals("test-value".getBytes(UTF_8), store.get(PREFIX_PATH + 
"test-key").join().get().getValue());
+        stateContext.delete("test-key");
+        assertFalse(store.get(PREFIX_PATH + "test-key").join().isPresent());
+    }
+
+    @Test
+    public void testGetAmount() throws Exception {
+        assertEquals(stateContext.getCounter("test-key"), 0);
+        stateContext.incrCounter("test-key", 10L);
+        assertEquals(countersCache.get(PREFIX_PATH + 
"test-key").join().get().longValue(), 10);
+        assertEquals(stateContext.getCounter("test-key"), 10);
+    }
+
+    @Test
+    public void testGetKeyNotPresent() throws Exception {
+        CompletableFuture<ByteBuffer> result = 
stateContext.getAsync("test-key");
+        assertTrue(result != null);
+        assertEquals(result.get(), null);
+    }
+
+}
diff --git 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 0bedb74..295199c 100644
--- 
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ 
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -151,6 +151,8 @@ public class LocalRunner implements AutoCloseable {
     protected SourceConfig sourceConfig;
     @Parameter(names = "--sinkConfig", description = "The json representation 
of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
     protected SinkConfig sinkConfig;
+    @Parameter(names = "--stateStorageImplClass", description = "The 
implemenatation class state storage service (by default Apache BookKeeper)", 
hidden = true, required = false)
+    protected String stateStorageImplClass;
     @Parameter(names = "--stateStorageServiceUrl", description = "The URL for 
the state storage service (by default Apache BookKeeper)", hidden = true)
     protected String stateStorageServiceUrl;
     @Parameter(names = "--brokerServiceUrl", description = "The URL for the 
Pulsar broker", hidden = true)
@@ -201,8 +203,9 @@ public class LocalRunner implements AutoCloseable {
     }
 
     @Builder
-    public LocalRunner(FunctionConfig functionConfig, SourceConfig 
sourceConfig, SinkConfig sinkConfig, String
-            stateStorageServiceUrl, String brokerServiceUrl, String 
clientAuthPlugin, String clientAuthParams,
+    public LocalRunner(FunctionConfig functionConfig, SourceConfig 
sourceConfig, SinkConfig sinkConfig,
+                       String stateStorageImplClass, String 
stateStorageServiceUrl, String brokerServiceUrl,
+                       String clientAuthPlugin, String clientAuthParams,
                        boolean useTls, boolean tlsAllowInsecureConnection, 
boolean tlsHostNameVerificationEnabled,
                        String tlsTrustCertFilePath, int instanceIdOffset, 
RuntimeEnv runtimeEnv,
                        String secretsProviderClassName, String 
secretsProviderConfig, String narExtractionDirectory,
@@ -210,6 +213,7 @@ public class LocalRunner implements AutoCloseable {
         this.functionConfig = functionConfig;
         this.sourceConfig = sourceConfig;
         this.sinkConfig = sinkConfig;
+        this.stateStorageImplClass = stateStorageImplClass;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.brokerServiceUrl = brokerServiceUrl;
         this.clientAuthPlugin = clientAuthPlugin;
@@ -614,6 +618,7 @@ public class LocalRunner implements AutoCloseable {
             }
             runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
                     serviceUrl,
+                    stateStorageImplClass,
                     stateStorageServiceUrl,
                     authConfig,
                     secretsProvider,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 1881b55..5995b87 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -94,6 +94,9 @@ public class JavaInstanceStarter implements AutoCloseable {
     @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert 
file path")
     public String tlsTrustCertFilePath;
 
+    @Parameter(names = "--state_storage_impl_class", description = "State 
Storage Service Implementation class\n", required= false)
+    public String stateStorageImplClass;
+
     @Parameter(names = "--state_storage_serviceurl", description = "State 
Storage Service Url\n", required= false)
     public String stateStorageServiceUrl;
 
@@ -196,6 +199,7 @@ public class JavaInstanceStarter implements AutoCloseable {
         RuntimeUtils.registerDefaultCollectors(collectorRegistry);
 
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", 
pulsarServiceUrl,
+                stateStorageImplClass,
                 stateStorageServiceUrl,
                 
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
                         
.clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index b6dd019..3f76863 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import io.prometheus.client.CollectorRegistry;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -66,6 +65,7 @@ public class ThreadRuntime implements Runtime {
     private ClientBuilder clientBuilder;
     private PulsarClient pulsarClient;
     private PulsarAdmin pulsarAdmin;
+    private String stateStorageImplClass;
     private String stateStorageServiceUrl;
     private SecretsProvider secretsProvider;
     private FunctionCollectorRegistry collectorRegistry;
@@ -79,6 +79,7 @@ public class ThreadRuntime implements Runtime {
                   PulsarClient client,
                   ClientBuilder clientBuilder,
                   PulsarAdmin pulsarAdmin,
+                  String stateStorageImplClass,
                   String stateStorageServiceUrl,
                   SecretsProvider secretsProvider,
                   FunctionCollectorRegistry collectorRegistry,
@@ -95,6 +96,7 @@ public class ThreadRuntime implements Runtime {
         this.clientBuilder = clientBuilder;
         this.pulsarClient = client;
         this.pulsarAdmin = pulsarAdmin;
+        this.stateStorageImplClass = stateStorageImplClass;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
         this.collectorRegistry = collectorRegistry;
@@ -174,6 +176,7 @@ public class ThreadRuntime implements Runtime {
                 clientBuilder,
                 pulsarClient,
                 pulsarAdmin,
+                stateStorageImplClass,
                 stateStorageServiceUrl,
                 secretsProvider,
                 collectorRegistry,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 864a067..1e8c96a 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -63,6 +63,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private ClientBuilder clientBuilder;
     private PulsarClient pulsarClient;
     private PulsarAdmin pulsarAdmin;
+    private String stateStorageImplClass;
     private String storageServiceUrl;
     private SecretsProvider defaultSecretsProvider;
     private FunctionCollectorRegistry collectorRegistry;
@@ -76,21 +77,27 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
      * This constructor is used by other runtimes (e.g. ProcessRuntime and 
KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of 
the function.
      * When used by other runtimes, the arguments such as secretsProvider and 
rootClassLoader will be provided.
      */
-    public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl, String storageServiceUrl,
+    public ThreadRuntimeFactory(String threadGroupName, String 
pulsarServiceUrl,
+                                String stateStorageImplClass,
+                                String storageServiceUrl,
                                 AuthenticationConfig authConfig, 
SecretsProvider secretsProvider,
                                 FunctionCollectorRegistry collectorRegistry, 
String narExtractionDirectory,
                                 ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled,
                                 String pulsarWebServiceUrl) throws Exception {
         initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, 
authConfig,
-                storageServiceUrl, null, secretsProvider, collectorRegistry, 
narExtractionDirectory,
+                stateStorageImplClass, storageServiceUrl, null, 
secretsProvider, collectorRegistry,
+                narExtractionDirectory,
                 rootClassLoader, exposePulsarAdminClientEnabled, 
pulsarWebServiceUrl, Optional.empty());
     }
 
-    private void initialize(String threadGroupName, 
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String 
pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
+    private void initialize(String threadGroupName, 
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
+                            String pulsarServiceUrl, AuthenticationConfig 
authConfig, String stateStorageImplClass,
+                            String storageServiceUrl,
                             SecretsProviderConfigurator 
secretsProviderConfigurator, SecretsProvider secretsProvider,
                             FunctionCollectorRegistry collectorRegistry, 
String narExtractionDirectory,
                             ClassLoader rootClassLoader, boolean 
exposePulsarAdminClientEnabled,
-                            String pulsarWebServiceUrl, 
Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {
+                            String pulsarWebServiceUrl, 
Optional<ConnectorsManager> connectorsManager)
+            throws PulsarClientException {
 
         if (rootClassLoader == null) {
             rootClassLoader = Thread.currentThread().getContextClassLoader();
@@ -104,6 +111,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
         this.pulsarAdmin = exposePulsarAdminClientEnabled ? 
InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null;
         this.clientBuilder = 
InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig, 
calculateClientMemoryLimit(memoryLimit));
         this.pulsarClient = this.clientBuilder.build();
+        this.stateStorageImplClass = stateStorageImplClass;
         this.storageServiceUrl = storageServiceUrl;
         this.collectorRegistry = collectorRegistry;
         this.narExtractionDirectory = narExtractionDirectory;
@@ -153,6 +161,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
 
         initialize(factoryConfig.getThreadGroupName(), 
Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()),
                 workerConfig.getPulsarServiceUrl(), authenticationConfig,
+                workerConfig.getStateStorageProviderImplementation(),
                 workerConfig.getStateStorageServiceUrl(), 
secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null,
                 workerConfig.isExposeAdminClientEnabled(), 
workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager));
@@ -179,6 +188,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory 
{
             pulsarClient,
             clientBuilder,
             pulsarAdmin,
+            stateStorageImplClass,
             storageServiceUrl,
             secretsProvider,
             collectorRegistry,
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0cf6a22..eb40ae3 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -50,6 +50,7 @@ import lombok.Data;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
+import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
 import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
 import 
org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
@@ -234,6 +235,13 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
         doc = "The service URL of state storage"
     )
     private String stateStorageServiceUrl;
+
+    @FieldContext(
+            category = CATEGORY_STATE,
+            doc = "The implementation class for the state store"
+    )
+    private String stateStorageProviderImplementation = 
BKStateStoreProviderImpl.class.getName();
+
     @FieldContext(
         category = CATEGORY_FUNC_RUNTIME_MNG,
         doc = "The Pulsar topic used for storing function assignment 
informations"
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4bea15e..e2bb784 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -167,7 +167,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null, 
null);
         CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new 
CompletableFuture<InstanceCommunication.MetricsData>();
         
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);
@@ -222,7 +222,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null, 
null);
         CompletableFuture<InstanceCommunication.MetricsData> completableFuture 
= new CompletableFuture<InstanceCommunication.MetricsData>();
         completableFuture.complete(javaInstanceRunnable.getMetrics());
         Runtime runtime = mock(Runtime.class);

Reply via email to