This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new be47c27638c [fix][fn] Support multiple input topics for Go runtime 
(#20000)
be47c27638c is described below

commit be47c27638c842a0370623a1c827816ce5242319
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed May 10 21:15:37 2023 +0800

    [fix][fn] Support multiple input topics for Go runtime (#20000)
    
    (cherry picked from commit f04252b84a8b594ea371645e367c491aa8e2dd80)
---
 pulsar-function-go/conf/conf.go                    |  18 ++-
 pulsar-function-go/conf/conf.yaml                  |   6 +-
 pulsar-function-go/pf/instance.go                  |   4 +-
 pulsar-function-go/pf/instanceConf.go              |  31 ++--
 .../functions/instance/go/GoInstanceConfig.java    |   5 +
 .../src/main/resources/findbugsExclude.xml         |  10 ++
 .../pulsar/functions/runtime/RuntimeUtils.java     |  13 +-
 .../integration/functions/PulsarFunctionsTest.java | 160 +++++++++++++--------
 .../functions/PulsarFunctionsTestBase.java         |   2 +
 .../functions/go/PulsarFunctionsGoTest.java        |   5 +
 .../functions/java/PulsarFunctionsJavaTest.java    |   4 +-
 .../python/PulsarFunctionsPythonTest.java          |   8 +-
 12 files changed, 179 insertions(+), 87 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c2ff443fcc4..d52b886b540 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -50,8 +50,8 @@ type Conf struct {
        SecretsMap           string `json:"secretsMap" yaml:"secretsMap"`
        Runtime              int32  `json:"runtime" yaml:"runtime"`
        //Deprecated
-       AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
-       Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
+       AutoACK     bool  `json:"autoAck" yaml:"autoAck"`
+       Parallelism int32 `json:"parallelism" yaml:"parallelism"`
        //source config
        SubscriptionType     int32  `json:"subscriptionType" 
yaml:"subscriptionType"`
        TimeoutMs            uint64 `json:"timeoutMs" yaml:"timeoutMs"`
@@ -59,10 +59,16 @@ type Conf struct {
        CleanupSubscription  bool   `json:"cleanupSubscription"  
yaml:"cleanupSubscription"`
        SubscriptionPosition int32  `json:"subscriptionPosition" 
yaml:"subscriptionPosition"`
        //source input specs
-       SourceSpecTopic            string `json:"sourceSpecsTopic" 
yaml:"sourceSpecsTopic"`
-       SourceSchemaType           string `json:"sourceSchemaType" 
yaml:"sourceSchemaType"`
-       IsRegexPatternSubscription bool   `json:"isRegexPatternSubscription" 
yaml:"isRegexPatternSubscription"`
-       ReceiverQueueSize          int32  `json:"receiverQueueSize" 
yaml:"receiverQueueSize"`
+       SourceInputSpecs map[string]string `json:"sourceInputSpecs" 
yaml:"sourceInputSpecs"`
+       // for backward compatibility
+       // Deprecated
+       SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
+       // Deprecated
+       SourceSchemaType string `json:"sourceSchemaType" 
yaml:"sourceSchemaType"`
+       // Deprecated
+       IsRegexPatternSubscription bool `json:"isRegexPatternSubscription" 
yaml:"isRegexPatternSubscription"`
+       // Deprecated
+       ReceiverQueueSize int32 `json:"receiverQueueSize" 
yaml:"receiverQueueSize"`
        //sink spec config
        SinkSpecTopic  string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
        SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml 
b/pulsar-function-go/conf/conf.yaml
index 59ac9bbd513..098e33cda1f 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -43,10 +43,8 @@ subscriptionName: ""
 cleanupSubscription: false
 subscriptionPosition: 1
 # source input specs
-sourceSpecsTopic: persistent://public/default/topic-01
-sourceSchemaType: ""
-isRegexPatternSubscription: false
-receiverQueueSize: 10
+sourceInputSpecs:
+  persistent://public/default/topic-01: "{\"schemaType\": \"\", 
\"isRegexPattern\": false, \"receiverQueueSize\": {\"value\": 10}}"
 # sink specs config
 sinkSpecsTopic: persistent://public/default/topic-02
 sinkSchemaType: ""
diff --git a/pulsar-function-go/pf/instance.go 
b/pulsar-function-go/pf/instance.go
index a82273031ec..138489444d1 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -439,7 +439,6 @@ func (gi *goInstance) addLogTopicHandler() {
        }()
 
        if gi.context.logAppender == nil {
-               log.Error("the logAppender is nil, if you want to use it, 
please specify `--log-topic` at startup.")
                return
        }
 
@@ -571,6 +570,9 @@ func (gi *goInstance) getMatchingMetricFunc() func(lbl 
*prometheus_client.LabelP
 
 func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) 
prometheus_client.Metric {
        filteredMetricFamilies := gi.getFilteredMetricFamilies(metricName)
+       if len(filteredMetricFamilies) == 0 {
+               return prometheus_client.Metric{}
+       }
        metricFunc := gi.getMatchingMetricFunc()
        matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric, 
metricFunc)
        return *matchingMetric
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index d60beef29e8..9d4cabfae5a 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+       "encoding/json"
        "fmt"
        "time"
 
@@ -44,6 +45,24 @@ type instanceConf struct {
 }
 
 func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
+       inputSpecs := make(map[string]*pb.ConsumerSpec)
+       // for backward compatibility
+       if cfg.SourceSpecTopic != "" {
+               inputSpecs[cfg.SourceSpecTopic] = &pb.ConsumerSpec{
+                       SchemaType:     cfg.SourceSchemaType,
+                       IsRegexPattern: cfg.IsRegexPatternSubscription,
+                       ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
+                               Value: cfg.ReceiverQueueSize,
+                       },
+               }
+       }
+       for topic, value := range cfg.SourceInputSpecs {
+               spec := &pb.ConsumerSpec{}
+               if err := json.Unmarshal([]byte(value), spec); err != nil {
+                       panic(fmt.Sprintf("Failed to unmarshal consume specs: 
%v", err))
+               }
+               inputSpecs[topic] = spec
+       }
        instanceConf := &instanceConf{
                instanceID:                  cfg.InstanceID,
                funcID:                      cfg.FuncID,
@@ -66,16 +85,8 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
                        AutoAck:              cfg.AutoACK,
                        Parallelism:          cfg.Parallelism,
                        Source: &pb.SourceSpec{
-                               SubscriptionType: 
pb.SubscriptionType(cfg.SubscriptionType),
-                               InputSpecs: map[string]*pb.ConsumerSpec{
-                                       cfg.SourceSpecTopic: {
-                                               SchemaType:     
cfg.SourceSchemaType,
-                                               IsRegexPattern: 
cfg.IsRegexPatternSubscription,
-                                               ReceiverQueueSize: 
&pb.ConsumerSpec_ReceiverQueueSize{
-                                                       Value: 
cfg.ReceiverQueueSize,
-                                               },
-                                       },
-                               },
+                               SubscriptionType:     
pb.SubscriptionType(cfg.SubscriptionType),
+                               InputSpecs:           inputSpecs,
                                TimeoutMs:            cfg.TimeoutMs,
                                SubscriptionName:     cfg.SubscriptionName,
                                CleanupSubscription:  cfg.CleanupSubscription,
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index abf17bdb165..67fe2a41d55 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.instance.go;
 
+import java.util.Map;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.functions.proto.Function;
@@ -53,6 +54,10 @@ public class GoInstanceConfig {
     private boolean cleanupSubscription;
     private int subscriptionPosition = 
Function.SubscriptionPosition.LATEST.getNumber();
 
+    // value is the json string of ConsumerSpec
+    private Map<String, String> sourceInputSpecs;
+
+    // for backward compatibility
     private String sourceSpecsTopic = "";
     private String sourceSchemaType = "";
     private boolean isRegexPatternSubscription;
diff --git a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml 
b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
index 027affbfeb2..7fe247d2ab2 100644
--- a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
@@ -542,4 +542,14 @@
     <Method name="setEvictionPolicy"/>
     <Bug pattern="EI_EXPOSE_REP2"/>
   </Match>
+  <Match>
+    <Class name="org.apache.pulsar.functions.instance.go.GoInstanceConfig"/>
+    <Method name="getSourceInputSpecs"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+  <Match>
+    <Class name="org.apache.pulsar.functions.instance.go.GoInstanceConfig"/>
+    <Method name="setSourceInputSpecs"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
 </FindBugsFilter>
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 53ebfcbfaf0..1a014b6f5c5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -38,6 +38,7 @@ import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -204,9 +205,15 @@ public class RuntimeUtils {
                 
instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
 
         if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
-            for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
-                goInstanceConfig.setSourceSpecsTopic(inputTopic);
+            Map<String, String> sourceInputSpecs = new HashMap<>();
+            for (Map.Entry<String, Function.ConsumerSpec> entry :
+                    
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().entrySet()) {
+                String topic = entry.getKey();
+                Function.ConsumerSpec spec = entry.getValue();
+                sourceInputSpecs.put(topic, 
JsonFormat.printer().omittingInsignificantWhitespace().print(spec));
+                goInstanceConfig.setSourceSpecsTopic(topic);
             }
+            goInstanceConfig.setSourceInputSpecs(sourceInputSpecs);
         }
 
         if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 
0) {
@@ -304,7 +311,7 @@ public class RuntimeUtils {
             }
 
             if (StringUtils.isNotEmpty(functionInstanceClassPath)) {
-               args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, 
functionInstanceClassPath));
+                args.add(String.format("-D%s=%s", 
FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath));
             } else {
                 // add complete classpath for broker/worker so that the 
function instance can load
                 // the functions instance dependencies separately from user 
code dependencies
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index eaf66974b98..6088628aac5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -29,10 +29,13 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -126,13 +129,14 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         return kvs;
     }
 
-    protected void testFunctionLocalRun(Runtime runtime) throws  Exception {
+    protected void testFunctionLocalRun(Runtime runtime) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD) {
             return;
         }
 
 
-        String inputTopicName = 
"persistent://public/default/test-function-local-run-" + runtime + "-input-" + 
randomName(8);
+        String inputTopicName =
+                "persistent://public/default/test-function-local-run-" + 
runtime + "-input-" + randomName(8);
         String outputTopicName = "test-function-local-run-" + runtime + 
"-output-" + randomName(8);
 
         final int numMessages = 10;
@@ -377,7 +381,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         if (runtime == Runtime.PYTHON) {
             submitFunction(
-                    runtime, inputTopicName, outputTopicName, functionName, 
EXCEPTION_FUNCTION_PYTHON_FILE, EXCEPTION_PYTHON_CLASS, schema);
+                    runtime, inputTopicName, outputTopicName, functionName, 
EXCEPTION_FUNCTION_PYTHON_FILE,
+                    EXCEPTION_PYTHON_CLASS, schema);
         } else {
             submitFunction(
                     runtime, inputTopicName, outputTopicName, functionName, 
null, EXCEPTION_JAVA_CLASS, schema);
@@ -550,7 +555,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         final int numMessages = 10;
 
         // submit the exclamation function
-        switch (runtime){
+        switch (runtime) {
             case JAVA:
                 submitFunction(
                         runtime,
@@ -622,7 +627,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     .create();
 
             for (int i = 0; i < numMessages; i++) {
-                producer.newMessage().key(String.valueOf(i)).property("count", 
String.valueOf(i)).value(("message-" + i).getBytes(UTF_8)).send();
+                producer.newMessage().key(String.valueOf(i)).property("count", 
String.valueOf(i))
+                        .value(("message-" + i).getBytes(UTF_8)).send();
             }
 
             Set<String> expectedMessages = new HashSet<>();
@@ -662,9 +668,10 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     protected void testExclamationFunction(Runtime runtime,
                                            boolean isTopicPattern,
                                            boolean pyZip,
+                                           boolean multipleInput,
                                            boolean withExtraDeps) throws 
Exception {
-        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == 
Runtime.PYTHON) {
-            // python can only run on process mode
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime == 
Runtime.PYTHON || runtime == Runtime.GO)) {
+            // python&go can only run on process mode
             return;
         }
 
@@ -683,22 +690,9 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             admin.topics().createNonPartitionedTopic(outputTopicName);
         }
         if (isTopicPattern) {
-            @Cleanup PulsarClient client = PulsarClient.builder()
-                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                    .build();
-
-            @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
-                    .topic(inputTopicName + "1")
-                    .subscriptionType(SubscriptionType.Exclusive)
-                    .subscriptionName("test-sub")
-                    .subscribe();
-
-            @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
-                    .topic(inputTopicName + "2")
-                    .subscriptionType(SubscriptionType.Exclusive)
-                    .subscriptionName("test-sub")
-                    .subscribe();
             inputTopicName = inputTopicName + ".*";
+        } else if (multipleInput) {
+            inputTopicName = inputTopicName + "1," + inputTopicName + "2";
         }
         String functionName = "test-exclamation-fn-" + randomName(8);
         final int numMessages = 10;
@@ -725,8 +719,11 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         // get function status
         getFunctionStatus(functionName, numMessages, true);
 
-        // get function stats
-        getFunctionStats(functionName, numMessages);
+        if (Runtime.GO != runtime) {
+            // TODO: Go runtime doesn't collect `process_latency_ms_1min` 
metric
+            // get function stats
+            getFunctionStats(functionName, numMessages);
+        }
 
         // update parallelism
         updateFunctionParallelism(functionName, 2);
@@ -787,6 +784,12 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             } else {
                 file = EXCLAMATION_PYTHON_FILE;
             }
+        } else if (Runtime.GO == runtime) {
+            if (isPublishFunction) {
+                file = PUBLISH_FUNCTION_GO_FILE;
+            } else {
+                file = EXCLAMATION_GO_FILE;
+            }
         }
 
         submitFunction(runtime, inputTopicName, outputTopicName, functionName, 
file, functionClass, inputTopicSchema);
@@ -819,7 +822,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         if (StringUtils.isNotEmpty(inputTopicName)) {
             ensureSubscriptionCreated(
-                inputTopicName, String.format("public/default/%s", 
functionName), inputTopicSchema);
+                    inputTopicName, String.format("public/default/%s", 
functionName), inputTopicSchema);
         }
 
         CommandGenerator generator;
@@ -853,7 +856,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         }
         String command = "";
 
-        switch (runtime){
+        switch (runtime) {
             case JAVA:
                 command = generator.generateCreateFunctionCommand();
                 break;
@@ -945,8 +948,17 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         try (PulsarClient client = PulsarClient.builder()
                 .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
                 .build()) {
+            List<String> topics = new ArrayList<>();
+            if (inputTopicName.endsWith(".*")) {
+                topics.add(inputTopicName.substring(0, inputTopicName.length() 
- 2) + "1");
+                topics.add(inputTopicName.substring(0, inputTopicName.length() 
- 2) + "2");
+            } else if (inputTopicName.contains(",")) {
+                topics.addAll(Arrays.asList(inputTopicName.split(",")));
+            } else {
+                topics.add(inputTopicName);
+            }
             try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
-                    .topic(inputTopicName)
+                    .topic(topics.toArray(new String[0]))
                     .subscriptionType(SubscriptionType.Shared)
                     .subscriptionName(subscriptionName)
                     .subscribe()) {
@@ -1042,7 +1054,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
 0);
         
assertTrue(functionStats.instances.get(0).getMetrics().getAvgProcessLatency() > 
0);
         
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getReceivedTotal(),
 numMessages);
-        
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(),
 numMessages);
+        
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(),
+                numMessages);
         
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getSystemExceptionsTotal(),
 0);
         
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getUserExceptionsTotal(),
 0);
         
assertTrue(functionStats.instances.get(0).getMetrics().getOneMin().getAvgProcessLatency()
 > 0);
@@ -1071,19 +1084,29 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     }
 
     private void checkSubscriptionsCleanup(String topic) throws Exception {
-        try {
-            ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
-                    PulsarCluster.ADMIN_SCRIPT,
-                    "topics",
-                    "stats",
-                    topic);
-            TopicStats topicStats = ObjectMapperFactory.getMapper().reader()
-                    .readValue(result.getStdout(), TopicStats.class);
-            assertEquals(topicStats.getSubscriptions().size(), 0);
-
-        } catch (ContainerExecException e) {
-            fail("Command should have exited with non-zero");
+        List<String> topics = new ArrayList<>();
+        if (topic.endsWith(".*")) {
+            topics.add(topic.substring(0, topic.length() - 2) + "1");
+            topics.add(topic.substring(0, topic.length() - 2) + "2");
+        } else if (topic.contains(",")) {
+            topics.addAll(Arrays.asList(topic.split(",")));
+        } else {
+            topics.add(topic);
         }
+        topics.stream().forEach(t -> {
+            try {
+                ContainerExecResult result = 
pulsarCluster.getAnyBroker().execCmd(
+                        PulsarCluster.ADMIN_SCRIPT,
+                        "topics",
+                        "stats",
+                        t);
+                TopicStats topicStats = 
ObjectMapperFactory.getMapper().reader()
+                        .readValue(result.getStdout(), TopicStats.class);
+                assertEquals(topicStats.getSubscriptions().size(), 0);
+            } catch (Exception e) {
+                fail("Command should have exited with non-zero");
+            }
+        });
     }
 
     private void checkPublisherCleanup(String topic) throws Exception {
@@ -1145,7 +1168,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
             lastInvocationTimeGreaterThanZero = 
lastInvocationTimeGreaterThanZero
                     || 
functionStatus.getInstances().get(i).getStatus().getLastInvocationTime() > 0;
             totalMessagesProcessed += 
functionStatus.getInstances().get(i).getStatus().getNumReceived();
-            totalMessagesSuccessfullyProcessed += 
functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
+            totalMessagesSuccessfullyProcessed +=
+                    
functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
             if (checkRestarts) {
                 
assertEquals(functionStatus.getInstances().get(i).getStatus().getNumRestarts(), 
0);
             }
@@ -1238,6 +1262,23 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 producer1.send(("message-" + i).getBytes(UTF_8));
             }
 
+            for (int i = numMessages / 2; i < numMessages; i++) {
+                producer2.send(("message-" + i).getBytes(UTF_8));
+            }
+        } else if (inputTopic.contains(",")) {
+            String[] topics = inputTopic.split(",");
+            @Cleanup Producer<byte[]> producer1 = 
client.newProducer(Schema.BYTES)
+                    .topic(topics[0])
+                    .create();
+
+            @Cleanup Producer<byte[]> producer2 = 
client.newProducer(Schema.BYTES)
+                    .topic(topics[1])
+                    .create();
+
+            for (int i = 0; i < numMessages / 2; i++) {
+                producer1.send(("message-" + i).getBytes(UTF_8));
+            }
+
             for (int i = numMessages / 2; i < numMessages; i++) {
                 producer2.send(("message-" + i).getBytes(UTF_8));
             }
@@ -1420,7 +1461,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     AVRO_SCHEMA_FUNCTION_PYTHON_FILE,
                     AVRO_SCHEMA_PYTHON_CLASS,
                     Schema.AVRO(AvroTestObject.class),
-                    null, objectMapper.writeValueAsString(inputSpecs), "avro", 
null, "avro_schema_test_function.AvroTestObject", 
"avro_schema_test_function.AvroTestObject");
+                    null, objectMapper.writeValueAsString(inputSpecs), "avro", 
null,
+                    "avro_schema_test_function.AvroTestObject", 
"avro_schema_test_function.AvroTestObject");
         }
         log.info("pulsar submitFunction");
 
@@ -1430,7 +1472,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         Set<Object> expectedSet = new HashSet<>();
 
         log.info("test-avro-schema producer connected: " + 
producer.isConnected());
-        for (int i = 0 ; i < numMessages ; i++) {
+        for (int i = 0; i < numMessages; i++) {
             AvroTestObject inputObject = new AvroTestObject();
             inputObject.setBaseValue(i);
             MessageId messageId = producer.send(inputObject);
@@ -1457,7 +1499,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         });
 
         log.info("test-avro-schema consumer connected: " + 
consumer.isConnected());
-        for (int i = 0 ; i < numMessages ; i++) {
+        for (int i = 0; i < numMessages; i++) {
             log.info("test-avro-schema consumer receive [{}] start", i);
             Message<AvroTestObject> message = consumer.receive();
             log.info("test-avro-schema consumer receive [{}] over", i);
@@ -1495,7 +1537,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         final int numMessages = 10;
 
         // submit the exclamation function
-        submitFunction(runtime, inputTopicName, outputTopicName, functionName, 
null, InitializableFunction.class.getName(), schema,
+        submitFunction(runtime, inputTopicName, outputTopicName, functionName, 
null,
+                InitializableFunction.class.getName(), schema,
                 Collections.singletonMap("publish-topic", outputTopicName), 
null, null, null, null, null);
 
         // publish and consume result
@@ -1650,7 +1693,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     }
 
 
-    protected void testGenericObjectFunction(String function, boolean 
removeAgeField, boolean keyValue) throws Exception {
+    protected void testGenericObjectFunction(String function, boolean 
removeAgeField, boolean keyValue)
+            throws Exception {
         log.info("start {} function test ...", function);
 
         String ns = "public/ns-genericobject-" + randomName(8);
@@ -1721,13 +1765,14 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     GenericRecord genericRecord = message.getValue();
                     if (keyValue) {
                         @SuppressWarnings("unchecked")
-                        KeyValue<GenericRecord, GenericRecord> keyValueObject 
= (KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
+                        KeyValue<GenericRecord, GenericRecord> keyValueObject =
+                                (KeyValue<GenericRecord, GenericRecord>) 
genericRecord.getNativeObject();
                         GenericRecord key = keyValueObject.getKey();
                         GenericRecord value = keyValueObject.getValue();
-                        key.getFields().forEach(f-> {
+                        key.getFields().forEach(f -> {
                             log.info("key field {} value {}", f.getName(), 
key.getField(f.getName()));
                         });
-                        value.getFields().forEach(f-> {
+                        value.getFields().forEach(f -> {
                             log.info("value field {} value {}", f.getName(), 
value.getField(f.getName()));
                         });
                         assertEquals(i, key.getField("age"));
@@ -1743,7 +1788,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                     } else {
                         GenericRecord value = genericRecord;
                         log.info("received value {}", value);
-                        value.getFields().forEach(f-> {
+                        value.getFields().forEach(f -> {
                             log.info("value field {} value {}", f.getName(), 
value.getField(f.getName()));
                         });
 
@@ -1939,13 +1984,14 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
     }
 
     private <T> void generateDataByDifferentSchema(String ns,
-                                               String baseTopic,
-                                               PulsarClient pulsarClient,
-                                               Schema<T> schema,
-                                               T data,
-                                               int messageCnt,
-                                               ObjectNode inputSpecNode,
-                                               Map<String, AtomicInteger> 
topicMsgCntMap) throws PulsarClientException {
+                                                   String baseTopic,
+                                                   PulsarClient pulsarClient,
+                                                   Schema<T> schema,
+                                                   T data,
+                                                   int messageCnt,
+                                                   ObjectNode inputSpecNode,
+                                                   Map<String, AtomicInteger> 
topicMsgCntMap)
+            throws PulsarClientException {
         String topic = ns + "/" + baseTopic;
         Producer<T> producer = pulsarClient.newProducer(schema)
                 .topic(topic)
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 033e590d6dd..62aa36da1b6 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -151,6 +151,8 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
             } else {
                 return EXCLAMATION_PYTHON_CLASS;
             }
+        } else if (Runtime.GO == runtime) {
+            return null;
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + 
runtime);
         }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
index 9ef2398c55c..ad8dc475aac 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
@@ -34,4 +34,9 @@ public abstract class PulsarFunctionsGoTest extends 
PulsarFunctionsTest {
         testFunctionLocalRun(Runtime.GO);
     }
 
+    @Test(groups = {"go_function", "function"})
+    public void testGoExclamationMultiInputsFunction() throws Exception {
+        testExclamationFunction(Runtime.GO, false, false, true, false);
+    }
+
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index e6f3c67ebcd..097a452937d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -98,12 +98,12 @@ public abstract class PulsarFunctionsJavaTest extends 
PulsarFunctionsTest {
 
     @Test(groups = {"java_function", "function"})
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, false, false, false);
+        testExclamationFunction(Runtime.JAVA, false, false, false, false);
     }
 
     @Test(groups = {"java_function", "function"})
     public void testJavaExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA, true, false, false);
+        testExclamationFunction(Runtime.JAVA, true, false, false, false);
     }
 
     @Test(groups = {"java_function", "function"})
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
index 1c75d704986..87a52d27e89 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
@@ -46,22 +46,22 @@ public abstract class PulsarFunctionsPythonTest extends 
PulsarFunctionsTest {
 
     @Test(groups = {"python_function", "function"})
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, false, false);
+        testExclamationFunction(Runtime.PYTHON, false, false, false, false);
     }
 
     @Test(groups = {"python_function", "function"})
     public void testPythonExclamationFunctionWithExtraDeps() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, false, true);
+        testExclamationFunction(Runtime.PYTHON, false, false, false, true);
     }
 
     @Test(groups = {"python_function", "function"})
     public void testPythonExclamationZipFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, false, true, false);
+        testExclamationFunction(Runtime.PYTHON, false, true, false, false);
     }
 
     @Test(groups = {"python_function", "function"})
     public void testPythonExclamationTopicPatternFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON, true, false, false);
+        testExclamationFunction(Runtime.PYTHON, true, false, false, false);
     }
 
     @Test(groups = {"python_function", "function"})


Reply via email to