This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2665ad011bfaa717b4fda10f3516110595d05634
Author: jiangpengcheng <scjiangpengch...@gmail.com>
AuthorDate: Wed Sep 10 19:37:36 2025 +0800

    [feat][fn] Fallback to using `STATE_STORAGE_SERVICE_URL` in 
`PulsarMetadataStateStoreProviderImpl.init` (#24721)
    
    (cherry picked from commit 415c6fa74e908bad2da1e7f986185c6ef17cb9a1)
---
 .../PulsarMetadataStateStoreProviderImpl.java      |   4 +
 .../functions/PulsarBKStateStoreTest.java          |  27 +++
 .../functions/PulsarMetadataStateStoreTest.java    |  60 ++++++
 .../integration/functions/PulsarStateTest.java     | 220 ++++++++++++---------
 .../src/test/resources/pulsar-function.xml         |   3 +-
 5 files changed, 225 insertions(+), 89 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
index 7b9807b2a78..5bffc5966b6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.state;
 
 import java.util.Map;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -46,6 +47,9 @@ public class PulsarMetadataStateStoreProviderImpl implements 
StateStoreProvider
             shouldCloseStore = false;
         } else {
             String metadataUrl = (String) config.get(METADATA_URL);
+            if (StringUtils.isEmpty(metadataUrl)) {
+                metadataUrl = (String) 
config.get(StateStoreProvider.STATE_STORAGE_SERVICE_URL);
+            }
             store = MetadataStoreFactory.create(metadataUrl, 
MetadataStoreConfig.builder()
                     
.metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build());
             shouldCloseStore = true;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java
new file mode 100644
index 00000000000..482dddb46d7
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarBKStateStoreTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
+
+public class PulsarBKStateStoreTest extends PulsarStateTest {
+    protected PulsarBKStateStoreTest() {
+        super(BKStateStoreProviderImpl.class.getName());
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java
new file mode 100644
index 00000000000..cb745928914
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarMetadataStateStoreTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import static org.testng.Assert.assertEquals;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Network;
+
+@Slf4j
+public class PulsarMetadataStateStoreTest extends PulsarStateTest {
+    protected PulsarMetadataStateStoreTest() {
+        super(PulsarMetadataStateStoreProviderImpl.class.getName());
+    }
+
+    public void setUpCluster() throws Exception {
+        incrementSetupNumber();
+        network = Network.newNetwork();
+        String clusterName = PulsarClusterTestBase.randomName(8);
+        container = new StandaloneContainer(clusterName, 
PulsarContainer.DEFAULT_IMAGE_NAME)
+                .withNetwork(network)
+                .withNetworkAliases(StandaloneContainer.NAME + "-" + 
clusterName)
+                .withEnv("PULSAR_STANDALONE_USE_ZOOKEEPER", "true")
+                .withEnv("PF_stateStorageProviderImplementation", 
PulsarMetadataStateStoreProviderImpl.class.getName())
+                .withEnv("PF_stateStorageServiceUrl", "zk:localhost:2181");
+        container.start();
+        log.info("Pulsar cluster {} is up running:", clusterName);
+        log.info("\tBinary Service Url : {}", 
container.getPlainTextServiceUrl());
+        log.info("\tHttp Service Url : {}", container.getHttpServiceUrl());
+
+        // add cluster to public tenant
+        ContainerExecResult result = container.execCmd(
+                "/pulsar/bin/pulsar-admin", "namespaces", "policies", 
"public/default");
+        assertEquals(0, result.getExitCode());
+        log.info("public/default namespace policies are {}", 
result.getStdout());
+    }
+
+
+}
+
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 856e4edfea0..2395c32ed39 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.api.examples.WordCountFunction;
+import 
org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -57,37 +59,49 @@ import org.testng.annotations.Test;
  * State related test cases.
  */
 @Slf4j
-public class PulsarStateTest extends PulsarStandaloneTestSuite {
+public abstract class PulsarStateTest extends PulsarStandaloneTestSuite {
+
+    protected String stateStoreProvider;
+
+    protected PulsarStateTest(String stateStoreProvider) {
+        super();
+        this.stateStoreProvider = stateStoreProvider;
+    }
 
     public static final String WORDCOUNT_PYTHON_CLASS =
-        "wordcount_function.WordCountFunction";
+            "wordcount_function.WordCountFunction";
+    public static final String WORDCOUNT_JAVA_CLASS = 
WordCountFunction.class.getName();
 
     public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";
 
     public static final String VALUE_BASE64 = 
"0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
-                                              + 
"2336366666263652d623038342d346631352d616565342d326330643135356131666"
-                                              + 
"36312026e311a3700000000000000000000000000000000000000000000000000000"
-                                              + 
"000000000000000000000000000000000000000000000000000000000";
-
-    @Test(groups = {"python_state", "state", "function", "python_function"})
-    public void testPythonWordCountFunction() throws Exception {
-        String functionName = "test-wordcount-py-fn-" + randomName(8);
-        doTestPythonWordCountFunction(functionName);
-
-        // after a function is deleted, its state should be clean
-        // we just recreate and test the word count function again, and it 
should have same result
-        doTestPythonWordCountFunction(functionName);
-    }
+            + 
"2336366666263652d623038342d346631352d616565342d326330643135356131666"
+            + 
"36312026e311a3700000000000000000000000000000000000000000000000000000"
+            + "000000000000000000000000000000000000000000000000000000000";
 
     private void doTestPythonWordCountFunction(String functionName) throws 
Exception {
         String inputTopicName = "test-wordcount-py-input-" + randomName(8);
         String outputTopicName = "test-wordcount-py-output-" + randomName(8);
 
-        final int numMessages = 10;
-        // submit the exclamation function
-        submitExclamationFunction(
+        // submit the word count function
+        submitWordCountFunction(
                 Runtime.PYTHON, inputTopicName, outputTopicName, functionName);
+        // run tests
+        doTestFunction(functionName, inputTopicName, outputTopicName);
+    }
+
+    private void doTestJavaWordCountFunction(String functionName) throws 
Exception {
+        String inputTopicName = "test-wordcount-java-input-" + randomName(8);
+
+        // submit the word count function
+        submitWordCountFunction(
+                Runtime.JAVA, inputTopicName, null, functionName);
+        // run tests
+        doTestFunction(functionName, inputTopicName, null);
+    }
 
+    private void doTestFunction(String functionName, String inputTopicName, 
String outputTopicName) throws Exception {
+        final int numMessages = 10;
         // get function info
         getFunctionInfoSuccess(functionName);
 
@@ -134,6 +148,31 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
         getFunctionInfoNotFound(functionName);
     }
 
+    @Test(groups = {"python_state", "state", "function", "python_function"})
+    public void testPythonWordCountFunction() throws Exception {
+        if 
(PulsarMetadataStateStoreProviderImpl.class.getName().equals(stateStoreProvider))
 {
+            // python function doesn't support metadata state store yet
+            return;
+        }
+        String functionName = "test-wordcount-py-fn-" + randomName(8);
+        doTestPythonWordCountFunction(functionName);
+
+        // after a function is deleted, its state should be clean
+        // we just recreate and test the word count function again, and it 
should have same result
+        doTestPythonWordCountFunction(functionName);
+    }
+
+    @Test(groups = {"java_state", "state", "function", "java_function"})
+    public void testJavaWordCountFunction() throws Exception {
+        String functionName = "test-wordcount-java-fn-" + randomName(8);
+        doTestJavaWordCountFunction(functionName);
+
+        // after a function is deleted, its state should be clean
+        // we just recreate and test the word count function again, and it 
should have same result
+        doTestJavaWordCountFunction(functionName);
+    }
+
+
     @Test(groups = {"java_state", "state", "function", "java_function"})
     public void testSourceState() throws Exception {
         String outputTopicName = "test-state-source-output-" + randomName(8);
@@ -195,7 +234,7 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
         String sinkName = "test-state-sink-" + randomName(8);
         int numMessages = 10;
 
-        submitSinkConnector(sinkName, inputTopicName, 
"org.apache.pulsar.tests.integration.io.TestStateSink",  JAVAJAR);
+        submitSinkConnector(sinkName, inputTopicName, 
"org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR);
 
         // get sink info
         getSinkInfoSuccess(sinkName);
@@ -320,9 +359,9 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
     }
 
     private void submitSourceConnector(String sourceName,
-                                         String outputTopicName,
-                                         String className,
-                                         String archive) throws Exception {
+                                       String outputTopicName,
+                                       String className,
+                                       String archive) throws Exception {
         String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "sources", "create",
@@ -339,9 +378,9 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
     }
 
     private void submitSinkConnector(String sinkName,
-                                         String inputTopicName,
-                                         String className,
-                                         String archive) throws Exception {
+                                     String inputTopicName,
+                                     String className,
+                                     String archive) throws Exception {
         String[] commands = {
                 PulsarCluster.ADMIN_SCRIPT,
                 "sinks", "create",
@@ -357,33 +396,35 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
                 result.getStdout());
     }
 
-    private void submitExclamationFunction(Runtime runtime,
-                                                  String inputTopicName,
-                                                  String outputTopicName,
-                                                  String functionName) throws 
Exception {
+    private void submitWordCountFunction(Runtime runtime,
+                                         String inputTopicName,
+                                         String outputTopicName,
+                                         String functionName) throws Exception 
{
         submitFunction(
-            runtime,
-            inputTopicName,
-            outputTopicName,
-            functionName,
-            getExclamationClass(runtime),
-            Schema.BYTES);
+                runtime,
+                inputTopicName,
+                outputTopicName,
+                functionName,
+                getWordCountClass(runtime),
+                Schema.BYTES);
     }
 
-    protected static String getExclamationClass(Runtime runtime) {
+    protected static String getWordCountClass(Runtime runtime) {
         if (Runtime.PYTHON == runtime) {
             return WORDCOUNT_PYTHON_CLASS;
+        } else if (Runtime.JAVA == runtime) {
+            return WORDCOUNT_JAVA_CLASS;
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
         }
     }
 
     private <T> void submitFunction(Runtime runtime,
-                                           String inputTopicName,
-                                           String outputTopicName,
-                                           String functionName,
-                                           String functionClass,
-                                           Schema<T> inputTopicSchema) throws 
Exception {
+                                    String inputTopicName,
+                                    String outputTopicName,
+                                    String functionName,
+                                    String functionClass,
+                                    Schema<T> inputTopicSchema) throws 
Exception {
         CommandGenerator generator;
         generator = CommandGenerator.createDefaultGenerator(inputTopicName, 
functionClass);
         generator.setSinkTopic(outputTopicName);
@@ -398,10 +439,10 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
             throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
         }
         String[] commands = {
-            "sh", "-c", command
+                "sh", "-c", command
         };
         ContainerExecResult result = container.execCmd(
-            commands);
+                commands);
         assertTrue(result.getStdout().contains("Created successfully"));
 
         ensureSubscriptionCreated(inputTopicName, 
String.format("public/default/%s", functionName), inputTopicSchema);
@@ -409,18 +450,18 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
 
     @SuppressWarnings("try")
     private <T> void ensureSubscriptionCreated(String inputTopicName,
-                                                      String subscriptionName,
-                                                      Schema<T> 
inputTopicSchema)
+                                               String subscriptionName,
+                                               Schema<T> inputTopicSchema)
             throws Exception {
         // ensure the function subscription exists before we start producing 
messages
         try (PulsarClient client = PulsarClient.builder()
-            .serviceUrl(container.getPlainTextServiceUrl())
-            .build()) {
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build()) {
             try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
-                .topic(inputTopicName)
-                .subscriptionType(SubscriptionType.Shared)
-                .subscriptionName(subscriptionName)
-                .subscribe()) {
+                    .topic(inputTopicName)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName(subscriptionName)
+                    .subscribe()) {
             }
         }
     }
@@ -451,12 +492,12 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
 
     private void getFunctionInfoSuccess(String functionName) throws Exception {
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "get",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
         );
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + 
"\""));
     }
@@ -502,27 +543,27 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
 
     private void getFunctionStatus(String functionName, int numMessages) 
throws Exception {
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "getstatus",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "getstatus",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
         );
         assertTrue(result.getStdout().contains("\"running\" : true"));
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : 
" + numMessages));
     }
 
     private void queryState(String functionName, String key, int amount, long 
version)
-        throws Exception {
+            throws Exception {
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "querystate",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName,
-            "--key", key
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "querystate",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName,
+                "--key", key
         );
         assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
         assertTrue(result.getStdout().contains("\"version\": " + version));
@@ -582,24 +623,27 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
     }
 
     private void publishAndConsumeMessages(String inputTopic,
-                                                  String outputTopic,
-                                                  int numMessages) throws 
Exception {
+                                           String outputTopic,
+                                           int numMessages) throws Exception {
         @Cleanup PulsarClient client = PulsarClient.builder()
-            .serviceUrl(container.getPlainTextServiceUrl())
-            .build();
-        @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
-            .topic(outputTopic)
-            .subscriptionType(SubscriptionType.Exclusive)
-            .subscriptionName("test-sub")
-            .subscribe();
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build();
         @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
-            .topic(inputTopic)
-            .create();
+                .topic(inputTopic)
+                .create();
 
         for (int i = 0; i < numMessages; i++) {
             producer.send(("hello test message-" + i).getBytes(UTF_8));
         }
 
+        if (outputTopic == null) {
+            return;
+        }
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+                .topic(outputTopic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("test-sub")
+                .subscribe();
         for (int i = 0; i < numMessages; i++) {
             Message<byte[]> msg = consumer.receive();
             assertEquals("hello test message-" + i + "!", new 
String(msg.getValue(), UTF_8));
@@ -608,12 +652,12 @@ public class PulsarStateTest extends 
PulsarStandaloneTestSuite {
 
     private void deleteFunction(String functionName) throws Exception {
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "functions",
-            "delete",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", functionName
+                PulsarCluster.ADMIN_SCRIPT,
+                "functions",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", functionName
         );
         assertTrue(result.getStdout().contains("Deleted successfully"));
         result.assertNoStderr();
diff --git a/tests/integration/src/test/resources/pulsar-function.xml 
b/tests/integration/src/test/resources/pulsar-function.xml
index a18a65ff2e1..61c7b5b4698 100644
--- a/tests/integration/src/test/resources/pulsar-function.xml
+++ b/tests/integration/src/test/resources/pulsar-function.xml
@@ -22,7 +22,8 @@
 <suite name="Pulsar Function Integration Tests" verbose="2" annotations="JDK">
     <test name="pulsar-function-test-suite" preserve-order="true" >
         <classes>
-            <class 
name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
+            <class 
name="org.apache.pulsar.tests.integration.functions.PulsarMetadataStateStoreTest"
 />
+            <class 
name="org.apache.pulsar.tests.integration.functions.PulsarBKStateStoreTest" />
             <class 
name="org.apache.pulsar.tests.integration.io.PulsarGenericObjectSinkTest"/>
             <class 
name="org.apache.pulsar.tests.integration.io.sources.GenericRecordSourceTest" />
             <class 
name="org.apache.pulsar.tests.integration.io.sources.PulsarSourcePropertyTest"/>

Reply via email to