This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2665ad011bfaa717b4fda10f3516110595d05634 Author: jiangpengcheng <scjiangpengch...@gmail.com> AuthorDate: Wed Sep 10 19:37:36 2025 +0800 [feat][fn] Fallback to using `STATE_STORAGE_SERVICE_URL` in `PulsarMetadataStateStoreProviderImpl.init` (#24721) (cherry picked from commit 415c6fa74e908bad2da1e7f986185c6ef17cb9a1) --- .../PulsarMetadataStateStoreProviderImpl.java | 4 + .../functions/PulsarBKStateStoreTest.java | 27 +++ .../functions/PulsarMetadataStateStoreTest.java | 60 ++++++ .../integration/functions/PulsarStateTest.java | 220 ++++++++++++--------- .../src/test/resources/pulsar-function.xml | 3 +- 5 files changed, 225 insertions(+), 89 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java index 7b9807b2a78..5bffc5966b6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.state; import java.util.Map; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; @@ -46,6 +47,9 @@ public class PulsarMetadataStateStoreProviderImpl implements StateStoreProvider shouldCloseStore = false; } else { String metadataUrl = (String) config.get(METADATA_URL); + if (StringUtils.isEmpty(metadataUrl)) { + metadataUrl = (String) config.get(StateStoreProvider.STATE_STORAGE_SERVICE_URL); + } store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder() .metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build()); shouldCloseStore = true; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java new file mode 100644 index 00000000000..482dddb46d7 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.functions; + +import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl; + +public class PulsarBKStateStoreTest extends PulsarStateTest { + protected PulsarBKStateStoreTest() { + super(BKStateStoreProviderImpl.class.getName()); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java new file mode 100644 index 00000000000..cb745928914 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.functions; + +import static org.testng.Assert.assertEquals; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.containers.StandaloneContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; +import org.testcontainers.containers.Network; + +@Slf4j +public class PulsarMetadataStateStoreTest extends PulsarStateTest { + protected PulsarMetadataStateStoreTest() { + super(PulsarMetadataStateStoreProviderImpl.class.getName()); + } + + public void setUpCluster() throws Exception { + incrementSetupNumber(); + network = Network.newNetwork(); + String clusterName = PulsarClusterTestBase.randomName(8); + container = new StandaloneContainer(clusterName, PulsarContainer.DEFAULT_IMAGE_NAME) + .withNetwork(network) + .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName) + .withEnv("PULSAR_STANDALONE_USE_ZOOKEEPER", "true") + .withEnv("PF_stateStorageProviderImplementation", PulsarMetadataStateStoreProviderImpl.class.getName()) + .withEnv("PF_stateStorageServiceUrl", "zk:localhost:2181"); + container.start(); + log.info("Pulsar cluster {} is up running:", clusterName); + log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl()); + log.info("\tHttp Service Url : {}", container.getHttpServiceUrl()); + + // add cluster to public tenant + ContainerExecResult result = container.execCmd( + "/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default"); + assertEquals(0, result.getExitCode()); + log.info("public/default namespace policies are {}", result.getStdout()); + } + + +} + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java index 856e4edfea0..2395c32ed39 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java @@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.functions.FunctionState; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.functions.api.examples.WordCountFunction; +import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; @@ -57,37 +59,49 @@ import org.testng.annotations.Test; * State related test cases. */ @Slf4j -public class PulsarStateTest extends PulsarStandaloneTestSuite { +public abstract class PulsarStateTest extends PulsarStandaloneTestSuite { + + protected String stateStoreProvider; + + protected PulsarStateTest(String stateStoreProvider) { + super(); + this.stateStoreProvider = stateStoreProvider; + } public static final String WORDCOUNT_PYTHON_CLASS = - "wordcount_function.WordCountFunction"; + "wordcount_function.WordCountFunction"; + public static final String WORDCOUNT_JAVA_CLASS = WordCountFunction.class.getName(); public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py"; public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243" - + "2336366666263652d623038342d346631352d616565342d326330643135356131666" - + "36312026e311a3700000000000000000000000000000000000000000000000000000" - + "000000000000000000000000000000000000000000000000000000000"; - - @Test(groups = {"python_state", "state", "function", "python_function"}) - public void testPythonWordCountFunction() throws Exception { - String functionName = "test-wordcount-py-fn-" + randomName(8); - doTestPythonWordCountFunction(functionName); - - // after a function is deleted, its state should be clean - // we just recreate and test the word count function again, and it should have same result - doTestPythonWordCountFunction(functionName); - } + + "2336366666263652d623038342d346631352d616565342d326330643135356131666" + + "36312026e311a3700000000000000000000000000000000000000000000000000000" + + "000000000000000000000000000000000000000000000000000000000"; private void doTestPythonWordCountFunction(String functionName) throws Exception { String inputTopicName = "test-wordcount-py-input-" + randomName(8); String outputTopicName = "test-wordcount-py-output-" + randomName(8); - final int numMessages = 10; - // submit the exclamation function - submitExclamationFunction( + // submit the word count function + submitWordCountFunction( Runtime.PYTHON, inputTopicName, outputTopicName, functionName); + // run tests + doTestFunction(functionName, inputTopicName, outputTopicName); + } + + private void doTestJavaWordCountFunction(String functionName) throws Exception { + String inputTopicName = "test-wordcount-java-input-" + randomName(8); + + // submit the word count function + submitWordCountFunction( + Runtime.JAVA, inputTopicName, null, functionName); + // run tests + doTestFunction(functionName, inputTopicName, null); + } + private void doTestFunction(String functionName, String inputTopicName, String outputTopicName) throws Exception { + final int numMessages = 10; // get function info getFunctionInfoSuccess(functionName); @@ -134,6 +148,31 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { getFunctionInfoNotFound(functionName); } + @Test(groups = {"python_state", "state", "function", "python_function"}) + public void testPythonWordCountFunction() throws Exception { + if (PulsarMetadataStateStoreProviderImpl.class.getName().equals(stateStoreProvider)) { + // python function doesn't support metadata state store yet + return; + } + String functionName = "test-wordcount-py-fn-" + randomName(8); + doTestPythonWordCountFunction(functionName); + + // after a function is deleted, its state should be clean + // we just recreate and test the word count function again, and it should have same result + doTestPythonWordCountFunction(functionName); + } + + @Test(groups = {"java_state", "state", "function", "java_function"}) + public void testJavaWordCountFunction() throws Exception { + String functionName = "test-wordcount-java-fn-" + randomName(8); + doTestJavaWordCountFunction(functionName); + + // after a function is deleted, its state should be clean + // we just recreate and test the word count function again, and it should have same result + doTestJavaWordCountFunction(functionName); + } + + @Test(groups = {"java_state", "state", "function", "java_function"}) public void testSourceState() throws Exception { String outputTopicName = "test-state-source-output-" + randomName(8); @@ -195,7 +234,7 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { String sinkName = "test-state-sink-" + randomName(8); int numMessages = 10; - submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR); + submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR); // get sink info getSinkInfoSuccess(sinkName); @@ -320,9 +359,9 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { } private void submitSourceConnector(String sourceName, - String outputTopicName, - String className, - String archive) throws Exception { + String outputTopicName, + String className, + String archive) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sources", "create", @@ -339,9 +378,9 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { } private void submitSinkConnector(String sinkName, - String inputTopicName, - String className, - String archive) throws Exception { + String inputTopicName, + String className, + String archive) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sinks", "create", @@ -357,33 +396,35 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { result.getStdout()); } - private void submitExclamationFunction(Runtime runtime, - String inputTopicName, - String outputTopicName, - String functionName) throws Exception { + private void submitWordCountFunction(Runtime runtime, + String inputTopicName, + String outputTopicName, + String functionName) throws Exception { submitFunction( - runtime, - inputTopicName, - outputTopicName, - functionName, - getExclamationClass(runtime), - Schema.BYTES); + runtime, + inputTopicName, + outputTopicName, + functionName, + getWordCountClass(runtime), + Schema.BYTES); } - protected static String getExclamationClass(Runtime runtime) { + protected static String getWordCountClass(Runtime runtime) { if (Runtime.PYTHON == runtime) { return WORDCOUNT_PYTHON_CLASS; + } else if (Runtime.JAVA == runtime) { + return WORDCOUNT_JAVA_CLASS; } else { throw new IllegalArgumentException("Unsupported runtime : " + runtime); } } private <T> void submitFunction(Runtime runtime, - String inputTopicName, - String outputTopicName, - String functionName, - String functionClass, - Schema<T> inputTopicSchema) throws Exception { + String inputTopicName, + String outputTopicName, + String functionName, + String functionClass, + Schema<T> inputTopicSchema) throws Exception { CommandGenerator generator; generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); generator.setSinkTopic(outputTopicName); @@ -398,10 +439,10 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { throw new IllegalArgumentException("Unsupported runtime : " + runtime); } String[] commands = { - "sh", "-c", command + "sh", "-c", command }; ContainerExecResult result = container.execCmd( - commands); + commands); assertTrue(result.getStdout().contains("Created successfully")); ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema); @@ -409,18 +450,18 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { @SuppressWarnings("try") private <T> void ensureSubscriptionCreated(String inputTopicName, - String subscriptionName, - Schema<T> inputTopicSchema) + String subscriptionName, + Schema<T> inputTopicSchema) throws Exception { // ensure the function subscription exists before we start producing messages try (PulsarClient client = PulsarClient.builder() - .serviceUrl(container.getPlainTextServiceUrl()) - .build()) { + .serviceUrl(container.getPlainTextServiceUrl()) + .build()) { try (Consumer<T> ignored = client.newConsumer(inputTopicSchema) - .topic(inputTopicName) - .subscriptionType(SubscriptionType.Shared) - .subscriptionName(subscriptionName) - .subscribe()) { + .topic(inputTopicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subscriptionName) + .subscribe()) { } } } @@ -451,12 +492,12 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { private void getFunctionInfoSuccess(String functionName) throws Exception { ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "functions", - "get", - "--tenant", "public", - "--namespace", "default", - "--name", functionName + PulsarCluster.ADMIN_SCRIPT, + "functions", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", functionName ); assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\"")); } @@ -502,27 +543,27 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { private void getFunctionStatus(String functionName, int numMessages) throws Exception { ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "functions", - "getstatus", - "--tenant", "public", - "--namespace", "default", - "--name", functionName + PulsarCluster.ADMIN_SCRIPT, + "functions", + "getstatus", + "--tenant", "public", + "--namespace", "default", + "--name", functionName ); assertTrue(result.getStdout().contains("\"running\" : true")); assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages)); } private void queryState(String functionName, String key, int amount, long version) - throws Exception { + throws Exception { ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "functions", - "querystate", - "--tenant", "public", - "--namespace", "default", - "--name", functionName, - "--key", key + PulsarCluster.ADMIN_SCRIPT, + "functions", + "querystate", + "--tenant", "public", + "--namespace", "default", + "--name", functionName, + "--key", key ); assertTrue(result.getStdout().contains("\"numberValue\": " + amount)); assertTrue(result.getStdout().contains("\"version\": " + version)); @@ -582,24 +623,27 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { } private void publishAndConsumeMessages(String inputTopic, - String outputTopic, - int numMessages) throws Exception { + String outputTopic, + int numMessages) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(container.getPlainTextServiceUrl()) - .build(); - @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) - .topic(outputTopic) - .subscriptionType(SubscriptionType.Exclusive) - .subscriptionName("test-sub") - .subscribe(); + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES) - .topic(inputTopic) - .create(); + .topic(inputTopic) + .create(); for (int i = 0; i < numMessages; i++) { producer.send(("hello test message-" + i).getBytes(UTF_8)); } + if (outputTopic == null) { + return; + } + @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES) + .topic(outputTopic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); for (int i = 0; i < numMessages; i++) { Message<byte[]> msg = consumer.receive(); assertEquals("hello test message-" + i + "!", new String(msg.getValue(), UTF_8)); @@ -608,12 +652,12 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite { private void deleteFunction(String functionName) throws Exception { ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "functions", - "delete", - "--tenant", "public", - "--namespace", "default", - "--name", functionName + PulsarCluster.ADMIN_SCRIPT, + "functions", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", functionName ); assertTrue(result.getStdout().contains("Deleted successfully")); result.assertNoStderr(); diff --git a/tests/integration/src/test/resources/pulsar-function.xml b/tests/integration/src/test/resources/pulsar-function.xml index a18a65ff2e1..61c7b5b4698 100644 --- a/tests/integration/src/test/resources/pulsar-function.xml +++ b/tests/integration/src/test/resources/pulsar-function.xml @@ -22,7 +22,8 @@ <suite name="Pulsar Function Integration Tests" verbose="2" annotations="JDK"> <test name="pulsar-function-test-suite" preserve-order="true" > <classes> - <class name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" /> + <class name="org.apache.pulsar.tests.integration.functions.PulsarMetadataStateStoreTest" /> + <class name="org.apache.pulsar.tests.integration.functions.PulsarBKStateStoreTest" /> <class name="org.apache.pulsar.tests.integration.io.PulsarGenericObjectSinkTest"/> <class name="org.apache.pulsar.tests.integration.io.sources.GenericRecordSourceTest" /> <class name="org.apache.pulsar.tests.integration.io.sources.PulsarSourcePropertyTest"/>