This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08a49c0 Allow to configure different implementations for Pulsar
functions state store (#12646)
08a49c0 is described below
commit 08a49c06bff4a52d26319a114961aed6cb6c4791
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Nov 6 16:51:49 2021 -0700
Allow to configure different implementations for Pulsar functions state
store (#12646)
---
.../worker/PulsarFunctionLocalRunTest.java | 6 +-
.../worker/PulsarFunctionMetadataStoreTest.java | 122 ++++++++++++++++++
pulsar-functions/instance/pom.xml | 6 +
.../functions/instance/JavaInstanceRunnable.java | 15 ++-
.../instance/state/BKStateStoreProviderImpl.java | 2 -
.../state/PulsarMetadataStateStoreImpl.java | 142 +++++++++++++++++++++
.../PulsarMetadataStateStoreProviderImpl.java | 67 ++++++++++
.../instance/state/StateStoreProvider.java | 2 +
.../instance/JavaInstanceRunnableTest.java | 2 +-
.../state/PulsarMetadataStateStoreImplTest.java | 120 +++++++++++++++++
.../org/apache/pulsar/functions/LocalRunner.java | 9 +-
.../functions/runtime/JavaInstanceStarter.java | 4 +
.../functions/runtime/thread/ThreadRuntime.java | 5 +-
.../runtime/thread/ThreadRuntimeFactory.java | 18 ++-
.../pulsar/functions/worker/WorkerConfig.java | 8 ++
.../worker/rest/api/FunctionsImplTest.java | 4 +-
16 files changed, 515 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index ed6c4aa..13ac623 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -304,7 +304,7 @@ public class PulsarFunctionLocalRunTest {
}
}
- private WorkerConfig createWorkerConfig(ServiceConfiguration config) {
+ protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
@@ -560,7 +560,7 @@ public class PulsarFunctionLocalRunTest {
}
}
- private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws
Exception {
+ protected void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws
Exception {
testE2EPulsarFunctionLocalRun(jarFilePathUrl, 1);
}
@@ -1133,7 +1133,7 @@ public class PulsarFunctionLocalRunTest {
}
}
- private void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable
throwingRunnable) throws Throwable {
+ protected void runWithPulsarFunctionsClassLoader(Assert.ThrowingRunnable
throwingRunnable) throws Throwable {
ClassLoader originalClassLoader =
Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(pulsarApiExamplesClassLoader);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
new file mode 100644
index 0000000..715837f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionMetadataStoreTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.api.Record;
+import
org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Pulsar sink on function
+ */
+@Slf4j
+@Test
+public class PulsarFunctionMetadataStoreTest extends
PulsarFunctionLocalRunTest {
+
+
+ protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
+ WorkerConfig wc = super.createWorkerConfig(config);
+
wc.setStateStorageProviderImplementation(PulsarMetadataStateStoreProviderImpl.class.getName());
+ wc.setStateStorageServiceUrl("memory://local");
+ return wc;
+ }
+
+ @Test
+ public void testE2EPulsarFunctionLocalRun() throws Throwable {
+ runWithPulsarFunctionsClassLoader(() ->
testE2EPulsarFunctionLocalRun(null));
+ }
+}
diff --git a/pulsar-functions/instance/pom.xml
b/pulsar-functions/instance/pom.xml
index 6c2ae1b..ece9af7 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -55,6 +55,12 @@
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-metadata</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cfdcb08..50994e5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -103,6 +103,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private LogAppender logAppender;
// provide tables for storing states
+ private final String stateStorageImplClass;
private final String stateStorageServiceUrl;
private StateStoreProvider stateStoreProvider;
private StateManager stateManager;
@@ -144,6 +145,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
ClientBuilder clientBuilder,
PulsarClient pulsarClient,
PulsarAdmin pulsarAdmin,
+ String stateStorageImplClass,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
@@ -152,6 +154,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
this.clientBuilder = clientBuilder;
this.client = (PulsarClientImpl) pulsarClient;
this.pulsarAdmin = pulsarAdmin;
+ this.stateStorageImplClass = stateStorageImplClass;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.functionClassLoader = functionClassLoader;
@@ -322,8 +325,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
if (null == stateStorageServiceUrl) {
stateStoreProvider = StateStoreProvider.NULL;
} else {
- stateStoreProvider = new BKStateStoreProviderImpl();
- Map<String, Object> stateStoreProviderConfig = new HashMap();
+ stateStoreProvider = getStateStoreProvider();
+ Map<String, Object> stateStoreProviderConfig = new HashMap<>();
stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL,
stateStorageServiceUrl);
stateStoreProvider.init(stateStoreProviderConfig,
instanceConfig.getFunctionDetails());
@@ -339,6 +342,14 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
}
}
+ private StateStoreProvider getStateStoreProvider() throws Exception {
+ if (stateStorageImplClass == null) {
+ return new BKStateStoreProviderImpl();
+ } else {
+ return (StateStoreProvider)
Class.forName(stateStorageImplClass).getConstructor().newInstance();
+ }
+ }
+
private void handleResult(Record srcRecord, JavaExecutionResult result)
throws Exception {
if (result.getUserException() != null) {
Exception t = result.getUserException();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
index fd4228a..32901bd 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreProviderImpl.java
@@ -56,8 +56,6 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
@Slf4j
public class BKStateStoreProviderImpl implements StateStoreProvider {
- public static final String STATE_STORAGE_SERVICE_URL =
"stateStorageServiceUrl";
-
private String stateStorageServiceUrl;
private Map<String, StorageClient> clients;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
new file mode 100644
index 0000000..1c1df7f
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+public class PulsarMetadataStateStoreImpl implements DefaultStateStore {
+
+ private final MetadataStore store;
+ private final String prefixPath;
+ private final MetadataCache<Long> countersCache;
+
+ private final String namespace;
+ private final String tenant;
+ private final String name;
+ private final String fqsn;
+
+ PulsarMetadataStateStoreImpl(MetadataStore store, String prefix, String
tenant, String namespace, String name) {
+ this.store = store;
+ this.tenant = tenant;
+ this.namespace = namespace;
+ this.name = name;
+ this.fqsn = tenant + '/' + namespace + '/' + name;
+
+ this.prefixPath = prefix + '/' + fqsn + '/';
+ this.countersCache = store.getMetadataCache(Long.class);
+ }
+
+ @Override
+ public String tenant() {
+ return tenant;
+ }
+
+ @Override
+ public String namespace() {
+ return namespace;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String fqsn() {
+ return fqsn;
+ }
+
+ @Override
+ public void init(StateStoreContext ctx) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void put(String key, ByteBuffer value) {
+ putAsync(key, value).join();
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
+ byte[] bytes = new byte[value.remaining()];
+ value.get(bytes);
+ return store.put(getPath(key), bytes, Optional.empty())
+ .thenApply(__ -> null);
+ }
+
+ @Override
+ public void delete(String key) {
+ deleteAsync(key).join();
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(String key) {
+ return store.delete(getPath(key), Optional.empty());
+ }
+
+ @Override
+ public ByteBuffer get(String key) {
+ return getAsync(key).join();
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getAsync(String key) {
+ return store.get(getPath(key))
+ .thenApply(optRes ->
+ optRes.map(x -> ByteBuffer.wrap(x.getValue()))
+ .orElse(null));
+ }
+
+ @Override
+ public void incrCounter(String key, long amount) {
+ incrCounterAsync(key, amount);
+ }
+
+ @Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ return countersCache.readModifyUpdateOrCreate(getPath(key), optValue ->
+ optValue.orElse(0L) + amount
+ ).thenApply(__ -> null);
+ }
+
+ @Override
+ public long getCounter(String key) {
+ return getCounterAsync(key).join();
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return countersCache.get(getPath(key))
+ .thenApply(optValue -> optValue.orElse(0L));
+ }
+
+ private String getPath(String key) {
+ return prefixPath + key;
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
new file mode 100644
index 0000000..819bfd9
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import java.util.Map;
+import lombok.SneakyThrows;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+
+public class PulsarMetadataStateStoreProviderImpl implements
StateStoreProvider {
+
+ private static final String METADATA_URL = "METADATA_URL";
+ private static final String METADATA_STORE_INSTANCE =
"METADATA_STORE_INSTANCE";
+
+ private static final String METADATA_PREFIX = "METADATA_PREFIX";
+ private static final String METADATA_DEFAULT_PREFIX = "/state-store";
+
+ private MetadataStore store;
+ private String prefix;
+ private boolean shouldCloseStore;
+
+ @Override
+ public void init(Map<String, Object> config, Function.FunctionDetails
functionDetails) throws Exception {
+
+ prefix = (String) config.getOrDefault(METADATA_PREFIX,
METADATA_DEFAULT_PREFIX);
+
+ if (config.containsKey(METADATA_STORE_INSTANCE)) {
+ store = (MetadataStore) config.get(METADATA_STORE_INSTANCE);
+ shouldCloseStore = false;
+ } else {
+ String metadataUrl = (String) config.get(METADATA_URL);
+ store = MetadataStoreFactory.create(metadataUrl,
MetadataStoreConfig.builder().build());
+ shouldCloseStore = true;
+ }
+ }
+
+ @Override
+ public DefaultStateStore getStateStore(String tenant, String namespace,
String name) throws Exception {
+ return new PulsarMetadataStateStoreImpl(store, prefix, tenant,
namespace, name);
+ }
+
+ @SneakyThrows
+ @Override
+ public void close() {
+ if (shouldCloseStore) {
+ store.close();
+ }
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
index db3c6b3..4c01e9d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateStoreProvider.java
@@ -27,6 +27,8 @@ import
org.apache.pulsar.functions.proto.Function.FunctionDetails;
*/
public interface StateStoreProvider extends AutoCloseable {
+ String STATE_STORAGE_SERVICE_URL = "stateStorageServiceUrl";
+
/**
* The state store provider returns `null` state stores.
*/
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 59a4f5b..b96e813 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -71,7 +71,7 @@ public class JavaInstanceRunnableTest {
ClientBuilder clientBuilder = mock(ClientBuilder.class);
when(clientBuilder.build()).thenReturn(null);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, clientBuilder, null, null, null, null, null, null);
+ config, clientBuilder, null, null, null,null, null, null,
null);
return javaInstanceRunnable;
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
new file mode 100644
index 0000000..a6fb3eb
--- /dev/null
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance.state;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
+import io.kubernetes.client.proto.Meta;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import lombok.SneakyThrows;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.options.Options;
+import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link BKStateStoreImpl}.
+ */
+public class PulsarMetadataStateStoreImplTest {
+
+ private static final String TENANT = "test-tenant";
+ private static final String NS = "test-ns";
+ private static final String NAME = "test-name";
+ private static final String FQSN = "test-tenant/test-ns/test-name";
+ private static final String PREFIX = "/prefix";
+ private static final String PREFIX_PATH = PREFIX + '/' + FQSN + '/';
+
+ private MetadataStore store;
+ private MetadataCache<Long> countersCache;
+ private DefaultStateStore stateContext;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ this.store = MetadataStoreFactory.create("memory://local",
MetadataStoreConfig.builder().build());
+ this.countersCache = store.getMetadataCache(Long.class);
+ this.stateContext = new PulsarMetadataStateStoreImpl(store, "/prefix",
TENANT, NS, NAME);
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ this.store.close();
+ }
+
+ @Test
+ public void testGetter() {
+ assertEquals(stateContext.tenant(), TENANT);
+ assertEquals(stateContext.namespace(), NS);
+ assertEquals(stateContext.name(), NAME);
+ assertEquals(stateContext.fqsn(), FQSN);
+ }
+
+ @Test
+ public void testIncr() throws Exception {
+ stateContext.incrCounter("test-key", 10L);
+ assertEquals(countersCache.get(PREFIX_PATH +
"test-key").join().get().longValue(), 10);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ stateContext.put("test-key",
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+ assertEquals(store.get(PREFIX_PATH +
"test-key").join().get().getValue(), "test-value".getBytes(UTF_8));
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ stateContext.put("test-key",
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+ assertEquals("test-value".getBytes(UTF_8), store.get(PREFIX_PATH +
"test-key").join().get().getValue());
+ stateContext.delete("test-key");
+ assertFalse(store.get(PREFIX_PATH + "test-key").join().isPresent());
+ }
+
+ @Test
+ public void testGetAmount() throws Exception {
+ assertEquals(stateContext.getCounter("test-key"), 0);
+ stateContext.incrCounter("test-key", 10L);
+ assertEquals(countersCache.get(PREFIX_PATH +
"test-key").join().get().longValue(), 10);
+ assertEquals(stateContext.getCounter("test-key"), 10);
+ }
+
+ @Test
+ public void testGetKeyNotPresent() throws Exception {
+ CompletableFuture<ByteBuffer> result =
stateContext.getAsync("test-key");
+ assertTrue(result != null);
+ assertEquals(result.get(), null);
+ }
+
+}
diff --git
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 0bedb74..295199c 100644
---
a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++
b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -151,6 +151,8 @@ public class LocalRunner implements AutoCloseable {
protected SourceConfig sourceConfig;
@Parameter(names = "--sinkConfig", description = "The json representation
of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
protected SinkConfig sinkConfig;
+ @Parameter(names = "--stateStorageImplClass", description = "The
implemenatation class state storage service (by default Apache BookKeeper)",
hidden = true, required = false)
+ protected String stateStorageImplClass;
@Parameter(names = "--stateStorageServiceUrl", description = "The URL for
the state storage service (by default Apache BookKeeper)", hidden = true)
protected String stateStorageServiceUrl;
@Parameter(names = "--brokerServiceUrl", description = "The URL for the
Pulsar broker", hidden = true)
@@ -201,8 +203,9 @@ public class LocalRunner implements AutoCloseable {
}
@Builder
- public LocalRunner(FunctionConfig functionConfig, SourceConfig
sourceConfig, SinkConfig sinkConfig, String
- stateStorageServiceUrl, String brokerServiceUrl, String
clientAuthPlugin, String clientAuthParams,
+ public LocalRunner(FunctionConfig functionConfig, SourceConfig
sourceConfig, SinkConfig sinkConfig,
+ String stateStorageImplClass, String
stateStorageServiceUrl, String brokerServiceUrl,
+ String clientAuthPlugin, String clientAuthParams,
boolean useTls, boolean tlsAllowInsecureConnection,
boolean tlsHostNameVerificationEnabled,
String tlsTrustCertFilePath, int instanceIdOffset,
RuntimeEnv runtimeEnv,
String secretsProviderClassName, String
secretsProviderConfig, String narExtractionDirectory,
@@ -210,6 +213,7 @@ public class LocalRunner implements AutoCloseable {
this.functionConfig = functionConfig;
this.sourceConfig = sourceConfig;
this.sinkConfig = sinkConfig;
+ this.stateStorageImplClass = stateStorageImplClass;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.brokerServiceUrl = brokerServiceUrl;
this.clientAuthPlugin = clientAuthPlugin;
@@ -614,6 +618,7 @@ public class LocalRunner implements AutoCloseable {
}
runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
+ stateStorageImplClass,
stateStorageServiceUrl,
authConfig,
secretsProvider,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 1881b55..5995b87 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -94,6 +94,9 @@ public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert
file path")
public String tlsTrustCertFilePath;
+ @Parameter(names = "--state_storage_impl_class", description = "State
Storage Service Implementation class\n", required= false)
+ public String stateStorageImplClass;
+
@Parameter(names = "--state_storage_serviceurl", description = "State
Storage Service Url\n", required= false)
public String stateStorageServiceUrl;
@@ -196,6 +199,7 @@ public class JavaInstanceStarter implements AutoCloseable {
RuntimeUtils.registerDefaultCollectors(collectorRegistry);
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
pulsarServiceUrl,
+ stateStorageImplClass,
stateStorageServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
.clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index b6dd019..3f76863 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -26,7 +26,6 @@ import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import io.prometheus.client.CollectorRegistry;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
@@ -66,6 +65,7 @@ public class ThreadRuntime implements Runtime {
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
+ private String stateStorageImplClass;
private String stateStorageServiceUrl;
private SecretsProvider secretsProvider;
private FunctionCollectorRegistry collectorRegistry;
@@ -79,6 +79,7 @@ public class ThreadRuntime implements Runtime {
PulsarClient client,
ClientBuilder clientBuilder,
PulsarAdmin pulsarAdmin,
+ String stateStorageImplClass,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
@@ -95,6 +96,7 @@ public class ThreadRuntime implements Runtime {
this.clientBuilder = clientBuilder;
this.pulsarClient = client;
this.pulsarAdmin = pulsarAdmin;
+ this.stateStorageImplClass = stateStorageImplClass;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
@@ -174,6 +176,7 @@ public class ThreadRuntime implements Runtime {
clientBuilder,
pulsarClient,
pulsarAdmin,
+ stateStorageImplClass,
stateStorageServiceUrl,
secretsProvider,
collectorRegistry,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 864a067..1e8c96a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -63,6 +63,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private ClientBuilder clientBuilder;
private PulsarClient pulsarClient;
private PulsarAdmin pulsarAdmin;
+ private String stateStorageImplClass;
private String storageServiceUrl;
private SecretsProvider defaultSecretsProvider;
private FunctionCollectorRegistry collectorRegistry;
@@ -76,21 +77,27 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
* This constructor is used by other runtimes (e.g. ProcessRuntime and
KubernetesRuntime) that rely on ThreadRuntime to actually run an instance of
the function.
* When used by other runtimes, the arguments such as secretsProvider and
rootClassLoader will be provided.
*/
- public ThreadRuntimeFactory(String threadGroupName, String
pulsarServiceUrl, String storageServiceUrl,
+ public ThreadRuntimeFactory(String threadGroupName, String
pulsarServiceUrl,
+ String stateStorageImplClass,
+ String storageServiceUrl,
AuthenticationConfig authConfig,
SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
String narExtractionDirectory,
ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl) throws Exception {
initialize(threadGroupName, Optional.empty(), pulsarServiceUrl,
authConfig,
- storageServiceUrl, null, secretsProvider, collectorRegistry,
narExtractionDirectory,
+ stateStorageImplClass, storageServiceUrl, null,
secretsProvider, collectorRegistry,
+ narExtractionDirectory,
rootClassLoader, exposePulsarAdminClientEnabled,
pulsarWebServiceUrl, Optional.empty());
}
- private void initialize(String threadGroupName,
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit, String
pulsarServiceUrl, AuthenticationConfig authConfig, String storageServiceUrl,
+ private void initialize(String threadGroupName,
Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
+ String pulsarServiceUrl, AuthenticationConfig
authConfig, String stateStorageImplClass,
+ String storageServiceUrl,
SecretsProviderConfigurator
secretsProviderConfigurator, SecretsProvider secretsProvider,
FunctionCollectorRegistry collectorRegistry,
String narExtractionDirectory,
ClassLoader rootClassLoader, boolean
exposePulsarAdminClientEnabled,
- String pulsarWebServiceUrl,
Optional<ConnectorsManager> connectorsManager) throws PulsarClientException {
+ String pulsarWebServiceUrl,
Optional<ConnectorsManager> connectorsManager)
+ throws PulsarClientException {
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
@@ -104,6 +111,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
this.pulsarAdmin = exposePulsarAdminClientEnabled ?
InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig) : null;
this.clientBuilder =
InstanceUtils.createPulsarClientBuilder(pulsarServiceUrl, authConfig,
calculateClientMemoryLimit(memoryLimit));
this.pulsarClient = this.clientBuilder.build();
+ this.stateStorageImplClass = stateStorageImplClass;
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
this.narExtractionDirectory = narExtractionDirectory;
@@ -153,6 +161,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
initialize(factoryConfig.getThreadGroupName(),
Optional.ofNullable(factoryConfig.getPulsarClientMemoryLimit()),
workerConfig.getPulsarServiceUrl(), authenticationConfig,
+ workerConfig.getStateStorageProviderImplementation(),
workerConfig.getStateStorageServiceUrl(),
secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null,
workerConfig.isExposeAdminClientEnabled(),
workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager));
@@ -179,6 +188,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory
{
pulsarClient,
clientBuilder,
pulsarAdmin,
+ stateStorageImplClass,
storageServiceUrl,
secretsProvider,
collectorRegistry,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 0cf6a22..eb40ae3 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -50,6 +50,7 @@ import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
+import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import
org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
@@ -234,6 +235,13 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
doc = "The service URL of state storage"
)
private String stateStorageServiceUrl;
+
+ @FieldContext(
+ category = CATEGORY_STATE,
+ doc = "The implementation class for the state store"
+ )
+ private String stateStorageProviderImplementation =
BKStateStoreProviderImpl.class.getName();
+
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The Pulsar topic used for storing function assignment
informations"
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4bea15e..e2bb784 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -167,7 +167,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null,
null);
CompletableFuture<InstanceCommunication.MetricsData>
metricsDataCompletableFuture = new
CompletableFuture<InstanceCommunication.MetricsData>();
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);
@@ -222,7 +222,7 @@ public class FunctionsImplTest {
instanceConfig.setMaxBufferedTuples(1024);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- instanceConfig, null, null, null, null, null, null, null);
+ instanceConfig, null, null, null, null, null, null, null,
null);
CompletableFuture<InstanceCommunication.MetricsData> completableFuture
= new CompletableFuture<InstanceCommunication.MetricsData>();
completableFuture.complete(javaInstanceRunnable.getMetrics());
Runtime runtime = mock(Runtime.class);