This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new be47c27638c [fix][fn] Support multiple input topics for Go runtime
(#20000)
be47c27638c is described below
commit be47c27638c842a0370623a1c827816ce5242319
Author: jiangpengcheng <[email protected]>
AuthorDate: Wed May 10 21:15:37 2023 +0800
[fix][fn] Support multiple input topics for Go runtime (#20000)
(cherry picked from commit f04252b84a8b594ea371645e367c491aa8e2dd80)
---
pulsar-function-go/conf/conf.go | 18 ++-
pulsar-function-go/conf/conf.yaml | 6 +-
pulsar-function-go/pf/instance.go | 4 +-
pulsar-function-go/pf/instanceConf.go | 31 ++--
.../functions/instance/go/GoInstanceConfig.java | 5 +
.../src/main/resources/findbugsExclude.xml | 10 ++
.../pulsar/functions/runtime/RuntimeUtils.java | 13 +-
.../integration/functions/PulsarFunctionsTest.java | 160 +++++++++++++--------
.../functions/PulsarFunctionsTestBase.java | 2 +
.../functions/go/PulsarFunctionsGoTest.java | 5 +
.../functions/java/PulsarFunctionsJavaTest.java | 4 +-
.../python/PulsarFunctionsPythonTest.java | 8 +-
12 files changed, 179 insertions(+), 87 deletions(-)
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c2ff443fcc4..d52b886b540 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -50,8 +50,8 @@ type Conf struct {
SecretsMap string `json:"secretsMap" yaml:"secretsMap"`
Runtime int32 `json:"runtime" yaml:"runtime"`
//Deprecated
- AutoACK bool `json:"autoAck" yaml:"autoAck"`
- Parallelism int32 `json:"parallelism" yaml:"parallelism"`
+ AutoACK bool `json:"autoAck" yaml:"autoAck"`
+ Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
SubscriptionType int32 `json:"subscriptionType"
yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
@@ -59,10 +59,16 @@ type Conf struct {
CleanupSubscription bool `json:"cleanupSubscription"
yaml:"cleanupSubscription"`
SubscriptionPosition int32 `json:"subscriptionPosition"
yaml:"subscriptionPosition"`
//source input specs
- SourceSpecTopic string `json:"sourceSpecsTopic"
yaml:"sourceSpecsTopic"`
- SourceSchemaType string `json:"sourceSchemaType"
yaml:"sourceSchemaType"`
- IsRegexPatternSubscription bool `json:"isRegexPatternSubscription"
yaml:"isRegexPatternSubscription"`
- ReceiverQueueSize int32 `json:"receiverQueueSize"
yaml:"receiverQueueSize"`
+ SourceInputSpecs map[string]string `json:"sourceInputSpecs"
yaml:"sourceInputSpecs"`
+ // for backward compatibility
+ // Deprecated
+ SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
+ // Deprecated
+ SourceSchemaType string `json:"sourceSchemaType"
yaml:"sourceSchemaType"`
+ // Deprecated
+ IsRegexPatternSubscription bool `json:"isRegexPatternSubscription"
yaml:"isRegexPatternSubscription"`
+ // Deprecated
+ ReceiverQueueSize int32 `json:"receiverQueueSize"
yaml:"receiverQueueSize"`
//sink spec config
SinkSpecTopic string `json:"sinkSpecsTopic" yaml:"sinkSpecsTopic"`
SinkSchemaType string `json:"sinkSchemaType" yaml:"sinkSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml
b/pulsar-function-go/conf/conf.yaml
index 59ac9bbd513..098e33cda1f 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -43,10 +43,8 @@ subscriptionName: ""
cleanupSubscription: false
subscriptionPosition: 1
# source input specs
-sourceSpecsTopic: persistent://public/default/topic-01
-sourceSchemaType: ""
-isRegexPatternSubscription: false
-receiverQueueSize: 10
+sourceInputSpecs:
+ persistent://public/default/topic-01: "{\"schemaType\": \"\",
\"isRegexPattern\": false, \"receiverQueueSize\": {\"value\": 10}}"
# sink specs config
sinkSpecsTopic: persistent://public/default/topic-02
sinkSchemaType: ""
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index a82273031ec..138489444d1 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -439,7 +439,6 @@ func (gi *goInstance) addLogTopicHandler() {
}()
if gi.context.logAppender == nil {
- log.Error("the logAppender is nil, if you want to use it,
please specify `--log-topic` at startup.")
return
}
@@ -571,6 +570,9 @@ func (gi *goInstance) getMatchingMetricFunc() func(lbl
*prometheus_client.LabelP
func (gi *goInstance) getMatchingMetricFromRegistry(metricName string)
prometheus_client.Metric {
filteredMetricFamilies := gi.getFilteredMetricFamilies(metricName)
+ if len(filteredMetricFamilies) == 0 {
+ return prometheus_client.Metric{}
+ }
metricFunc := gi.getMatchingMetricFunc()
matchingMetric := getFirstMatch(filteredMetricFamilies[0].Metric,
metricFunc)
return *matchingMetric
diff --git a/pulsar-function-go/pf/instanceConf.go
b/pulsar-function-go/pf/instanceConf.go
index d60beef29e8..9d4cabfae5a 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -20,6 +20,7 @@
package pf
import (
+ "encoding/json"
"fmt"
"time"
@@ -44,6 +45,24 @@ type instanceConf struct {
}
func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
+ inputSpecs := make(map[string]*pb.ConsumerSpec)
+ // for backward compatibility
+ if cfg.SourceSpecTopic != "" {
+ inputSpecs[cfg.SourceSpecTopic] = &pb.ConsumerSpec{
+ SchemaType: cfg.SourceSchemaType,
+ IsRegexPattern: cfg.IsRegexPatternSubscription,
+ ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
+ Value: cfg.ReceiverQueueSize,
+ },
+ }
+ }
+ for topic, value := range cfg.SourceInputSpecs {
+ spec := &pb.ConsumerSpec{}
+ if err := json.Unmarshal([]byte(value), spec); err != nil {
+ panic(fmt.Sprintf("Failed to unmarshal consume specs:
%v", err))
+ }
+ inputSpecs[topic] = spec
+ }
instanceConf := &instanceConf{
instanceID: cfg.InstanceID,
funcID: cfg.FuncID,
@@ -66,16 +85,8 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf {
AutoAck: cfg.AutoACK,
Parallelism: cfg.Parallelism,
Source: &pb.SourceSpec{
- SubscriptionType:
pb.SubscriptionType(cfg.SubscriptionType),
- InputSpecs: map[string]*pb.ConsumerSpec{
- cfg.SourceSpecTopic: {
- SchemaType:
cfg.SourceSchemaType,
- IsRegexPattern:
cfg.IsRegexPatternSubscription,
- ReceiverQueueSize:
&pb.ConsumerSpec_ReceiverQueueSize{
- Value:
cfg.ReceiverQueueSize,
- },
- },
- },
+ SubscriptionType:
pb.SubscriptionType(cfg.SubscriptionType),
+ InputSpecs: inputSpecs,
TimeoutMs: cfg.TimeoutMs,
SubscriptionName: cfg.SubscriptionName,
CleanupSubscription: cfg.CleanupSubscription,
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index abf17bdb165..67fe2a41d55 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.instance.go;
+import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.functions.proto.Function;
@@ -53,6 +54,10 @@ public class GoInstanceConfig {
private boolean cleanupSubscription;
private int subscriptionPosition =
Function.SubscriptionPosition.LATEST.getNumber();
+ // value is the json string of ConsumerSpec
+ private Map<String, String> sourceInputSpecs;
+
+ // for backward compatibility
private String sourceSpecsTopic = "";
private String sourceSchemaType = "";
private boolean isRegexPatternSubscription;
diff --git a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
index 027affbfeb2..7fe247d2ab2 100644
--- a/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/instance/src/main/resources/findbugsExclude.xml
@@ -542,4 +542,14 @@
<Method name="setEvictionPolicy"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.functions.instance.go.GoInstanceConfig"/>
+ <Method name="getSourceInputSpecs"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
+ <Class name="org.apache.pulsar.functions.instance.go.GoInstanceConfig"/>
+ <Method name="setSourceInputSpecs"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
</FindBugsFilter>
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 53ebfcbfaf0..1a014b6f5c5 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -38,6 +38,7 @@ import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -204,9 +205,15 @@ public class RuntimeUtils {
instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap()
!= null) {
- for (String inputTopic :
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
- goInstanceConfig.setSourceSpecsTopic(inputTopic);
+ Map<String, String> sourceInputSpecs = new HashMap<>();
+ for (Map.Entry<String, Function.ConsumerSpec> entry :
+
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().entrySet()) {
+ String topic = entry.getKey();
+ Function.ConsumerSpec spec = entry.getValue();
+ sourceInputSpecs.put(topic,
JsonFormat.printer().omittingInsignificantWhitespace().print(spec));
+ goInstanceConfig.setSourceSpecsTopic(topic);
}
+ goInstanceConfig.setSourceInputSpecs(sourceInputSpecs);
}
if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() !=
0) {
@@ -304,7 +311,7 @@ public class RuntimeUtils {
}
if (StringUtils.isNotEmpty(functionInstanceClassPath)) {
- args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH,
functionInstanceClassPath));
+ args.add(String.format("-D%s=%s",
FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath));
} else {
// add complete classpath for broker/worker so that the
function instance can load
// the functions instance dependencies separately from user
code dependencies
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index eaf66974b98..6088628aac5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -29,10 +29,13 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -126,13 +129,14 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
return kvs;
}
- protected void testFunctionLocalRun(Runtime runtime) throws Exception {
+ protected void testFunctionLocalRun(Runtime runtime) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD) {
return;
}
- String inputTopicName =
"persistent://public/default/test-function-local-run-" + runtime + "-input-" +
randomName(8);
+ String inputTopicName =
+ "persistent://public/default/test-function-local-run-" +
runtime + "-input-" + randomName(8);
String outputTopicName = "test-function-local-run-" + runtime +
"-output-" + randomName(8);
final int numMessages = 10;
@@ -377,7 +381,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
if (runtime == Runtime.PYTHON) {
submitFunction(
- runtime, inputTopicName, outputTopicName, functionName,
EXCEPTION_FUNCTION_PYTHON_FILE, EXCEPTION_PYTHON_CLASS, schema);
+ runtime, inputTopicName, outputTopicName, functionName,
EXCEPTION_FUNCTION_PYTHON_FILE,
+ EXCEPTION_PYTHON_CLASS, schema);
} else {
submitFunction(
runtime, inputTopicName, outputTopicName, functionName,
null, EXCEPTION_JAVA_CLASS, schema);
@@ -550,7 +555,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
final int numMessages = 10;
// submit the exclamation function
- switch (runtime){
+ switch (runtime) {
case JAVA:
submitFunction(
runtime,
@@ -622,7 +627,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
.create();
for (int i = 0; i < numMessages; i++) {
- producer.newMessage().key(String.valueOf(i)).property("count",
String.valueOf(i)).value(("message-" + i).getBytes(UTF_8)).send();
+ producer.newMessage().key(String.valueOf(i)).property("count",
String.valueOf(i))
+ .value(("message-" + i).getBytes(UTF_8)).send();
}
Set<String> expectedMessages = new HashSet<>();
@@ -662,9 +668,10 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
protected void testExclamationFunction(Runtime runtime,
boolean isTopicPattern,
boolean pyZip,
+ boolean multipleInput,
boolean withExtraDeps) throws
Exception {
- if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime ==
Runtime.PYTHON) {
- // python can only run on process mode
+ if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime ==
Runtime.PYTHON || runtime == Runtime.GO)) {
+ // python&go can only run on process mode
return;
}
@@ -683,22 +690,9 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
admin.topics().createNonPartitionedTopic(outputTopicName);
}
if (isTopicPattern) {
- @Cleanup PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
-
- @Cleanup Consumer<?> consumer1 = client.newConsumer(schema)
- .topic(inputTopicName + "1")
- .subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName("test-sub")
- .subscribe();
-
- @Cleanup Consumer<?> consumer2 = client.newConsumer(schema)
- .topic(inputTopicName + "2")
- .subscriptionType(SubscriptionType.Exclusive)
- .subscriptionName("test-sub")
- .subscribe();
inputTopicName = inputTopicName + ".*";
+ } else if (multipleInput) {
+ inputTopicName = inputTopicName + "1," + inputTopicName + "2";
}
String functionName = "test-exclamation-fn-" + randomName(8);
final int numMessages = 10;
@@ -725,8 +719,11 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
// get function status
getFunctionStatus(functionName, numMessages, true);
- // get function stats
- getFunctionStats(functionName, numMessages);
+ if (Runtime.GO != runtime) {
+ // TODO: Go runtime doesn't collect `process_latency_ms_1min`
metric
+ // get function stats
+ getFunctionStats(functionName, numMessages);
+ }
// update parallelism
updateFunctionParallelism(functionName, 2);
@@ -787,6 +784,12 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
} else {
file = EXCLAMATION_PYTHON_FILE;
}
+ } else if (Runtime.GO == runtime) {
+ if (isPublishFunction) {
+ file = PUBLISH_FUNCTION_GO_FILE;
+ } else {
+ file = EXCLAMATION_GO_FILE;
+ }
}
submitFunction(runtime, inputTopicName, outputTopicName, functionName,
file, functionClass, inputTopicSchema);
@@ -819,7 +822,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
if (StringUtils.isNotEmpty(inputTopicName)) {
ensureSubscriptionCreated(
- inputTopicName, String.format("public/default/%s",
functionName), inputTopicSchema);
+ inputTopicName, String.format("public/default/%s",
functionName), inputTopicSchema);
}
CommandGenerator generator;
@@ -853,7 +856,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
String command = "";
- switch (runtime){
+ switch (runtime) {
case JAVA:
command = generator.generateCreateFunctionCommand();
break;
@@ -945,8 +948,17 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build()) {
+ List<String> topics = new ArrayList<>();
+ if (inputTopicName.endsWith(".*")) {
+ topics.add(inputTopicName.substring(0, inputTopicName.length()
- 2) + "1");
+ topics.add(inputTopicName.substring(0, inputTopicName.length()
- 2) + "2");
+ } else if (inputTopicName.contains(",")) {
+ topics.addAll(Arrays.asList(inputTopicName.split(",")));
+ } else {
+ topics.add(inputTopicName);
+ }
try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
- .topic(inputTopicName)
+ .topic(topics.toArray(new String[0]))
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subscriptionName)
.subscribe()) {
@@ -1042,7 +1054,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(),
0);
assertTrue(functionStats.instances.get(0).getMetrics().getAvgProcessLatency() >
0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getReceivedTotal(),
numMessages);
-
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(),
numMessages);
+
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getProcessedSuccessfullyTotal(),
+ numMessages);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getSystemExceptionsTotal(),
0);
assertEquals(functionStats.instances.get(0).getMetrics().getOneMin().getUserExceptionsTotal(),
0);
assertTrue(functionStats.instances.get(0).getMetrics().getOneMin().getAvgProcessLatency()
> 0);
@@ -1071,19 +1084,29 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
private void checkSubscriptionsCleanup(String topic) throws Exception {
- try {
- ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
- PulsarCluster.ADMIN_SCRIPT,
- "topics",
- "stats",
- topic);
- TopicStats topicStats = ObjectMapperFactory.getMapper().reader()
- .readValue(result.getStdout(), TopicStats.class);
- assertEquals(topicStats.getSubscriptions().size(), 0);
-
- } catch (ContainerExecException e) {
- fail("Command should have exited with non-zero");
+ List<String> topics = new ArrayList<>();
+ if (topic.endsWith(".*")) {
+ topics.add(topic.substring(0, topic.length() - 2) + "1");
+ topics.add(topic.substring(0, topic.length() - 2) + "2");
+ } else if (topic.contains(",")) {
+ topics.addAll(Arrays.asList(topic.split(",")));
+ } else {
+ topics.add(topic);
}
+ topics.stream().forEach(t -> {
+ try {
+ ContainerExecResult result =
pulsarCluster.getAnyBroker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "stats",
+ t);
+ TopicStats topicStats =
ObjectMapperFactory.getMapper().reader()
+ .readValue(result.getStdout(), TopicStats.class);
+ assertEquals(topicStats.getSubscriptions().size(), 0);
+ } catch (Exception e) {
+ fail("Command should have exited with non-zero");
+ }
+ });
}
private void checkPublisherCleanup(String topic) throws Exception {
@@ -1145,7 +1168,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
lastInvocationTimeGreaterThanZero =
lastInvocationTimeGreaterThanZero
||
functionStatus.getInstances().get(i).getStatus().getLastInvocationTime() > 0;
totalMessagesProcessed +=
functionStatus.getInstances().get(i).getStatus().getNumReceived();
- totalMessagesSuccessfullyProcessed +=
functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
+ totalMessagesSuccessfullyProcessed +=
+
functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
if (checkRestarts) {
assertEquals(functionStatus.getInstances().get(i).getStatus().getNumRestarts(),
0);
}
@@ -1238,6 +1262,23 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
producer1.send(("message-" + i).getBytes(UTF_8));
}
+ for (int i = numMessages / 2; i < numMessages; i++) {
+ producer2.send(("message-" + i).getBytes(UTF_8));
+ }
+ } else if (inputTopic.contains(",")) {
+ String[] topics = inputTopic.split(",");
+ @Cleanup Producer<byte[]> producer1 =
client.newProducer(Schema.BYTES)
+ .topic(topics[0])
+ .create();
+
+ @Cleanup Producer<byte[]> producer2 =
client.newProducer(Schema.BYTES)
+ .topic(topics[1])
+ .create();
+
+ for (int i = 0; i < numMessages / 2; i++) {
+ producer1.send(("message-" + i).getBytes(UTF_8));
+ }
+
for (int i = numMessages / 2; i < numMessages; i++) {
producer2.send(("message-" + i).getBytes(UTF_8));
}
@@ -1420,7 +1461,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
AVRO_SCHEMA_FUNCTION_PYTHON_FILE,
AVRO_SCHEMA_PYTHON_CLASS,
Schema.AVRO(AvroTestObject.class),
- null, objectMapper.writeValueAsString(inputSpecs), "avro",
null, "avro_schema_test_function.AvroTestObject",
"avro_schema_test_function.AvroTestObject");
+ null, objectMapper.writeValueAsString(inputSpecs), "avro",
null,
+ "avro_schema_test_function.AvroTestObject",
"avro_schema_test_function.AvroTestObject");
}
log.info("pulsar submitFunction");
@@ -1430,7 +1472,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
Set<Object> expectedSet = new HashSet<>();
log.info("test-avro-schema producer connected: " +
producer.isConnected());
- for (int i = 0 ; i < numMessages ; i++) {
+ for (int i = 0; i < numMessages; i++) {
AvroTestObject inputObject = new AvroTestObject();
inputObject.setBaseValue(i);
MessageId messageId = producer.send(inputObject);
@@ -1457,7 +1499,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
});
log.info("test-avro-schema consumer connected: " +
consumer.isConnected());
- for (int i = 0 ; i < numMessages ; i++) {
+ for (int i = 0; i < numMessages; i++) {
log.info("test-avro-schema consumer receive [{}] start", i);
Message<AvroTestObject> message = consumer.receive();
log.info("test-avro-schema consumer receive [{}] over", i);
@@ -1495,7 +1537,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
final int numMessages = 10;
// submit the exclamation function
- submitFunction(runtime, inputTopicName, outputTopicName, functionName,
null, InitializableFunction.class.getName(), schema,
+ submitFunction(runtime, inputTopicName, outputTopicName, functionName,
null,
+ InitializableFunction.class.getName(), schema,
Collections.singletonMap("publish-topic", outputTopicName),
null, null, null, null, null);
// publish and consume result
@@ -1650,7 +1693,8 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
- protected void testGenericObjectFunction(String function, boolean
removeAgeField, boolean keyValue) throws Exception {
+ protected void testGenericObjectFunction(String function, boolean
removeAgeField, boolean keyValue)
+ throws Exception {
log.info("start {} function test ...", function);
String ns = "public/ns-genericobject-" + randomName(8);
@@ -1721,13 +1765,14 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
GenericRecord genericRecord = message.getValue();
if (keyValue) {
@SuppressWarnings("unchecked")
- KeyValue<GenericRecord, GenericRecord> keyValueObject
= (KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
+ KeyValue<GenericRecord, GenericRecord> keyValueObject =
+ (KeyValue<GenericRecord, GenericRecord>)
genericRecord.getNativeObject();
GenericRecord key = keyValueObject.getKey();
GenericRecord value = keyValueObject.getValue();
- key.getFields().forEach(f-> {
+ key.getFields().forEach(f -> {
log.info("key field {} value {}", f.getName(),
key.getField(f.getName()));
});
- value.getFields().forEach(f-> {
+ value.getFields().forEach(f -> {
log.info("value field {} value {}", f.getName(),
value.getField(f.getName()));
});
assertEquals(i, key.getField("age"));
@@ -1743,7 +1788,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
} else {
GenericRecord value = genericRecord;
log.info("received value {}", value);
- value.getFields().forEach(f-> {
+ value.getFields().forEach(f -> {
log.info("value field {} value {}", f.getName(),
value.getField(f.getName()));
});
@@ -1939,13 +1984,14 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
}
private <T> void generateDataByDifferentSchema(String ns,
- String baseTopic,
- PulsarClient pulsarClient,
- Schema<T> schema,
- T data,
- int messageCnt,
- ObjectNode inputSpecNode,
- Map<String, AtomicInteger>
topicMsgCntMap) throws PulsarClientException {
+ String baseTopic,
+ PulsarClient pulsarClient,
+ Schema<T> schema,
+ T data,
+ int messageCnt,
+ ObjectNode inputSpecNode,
+ Map<String, AtomicInteger>
topicMsgCntMap)
+ throws PulsarClientException {
String topic = ns + "/" + baseTopic;
Producer<T> producer = pulsarClient.newProducer(schema)
.topic(topic)
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 033e590d6dd..62aa36da1b6 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -151,6 +151,8 @@ public abstract class PulsarFunctionsTestBase extends
PulsarTestSuite {
} else {
return EXCLAMATION_PYTHON_CLASS;
}
+ } else if (Runtime.GO == runtime) {
+ return null;
} else {
throw new IllegalArgumentException("Unsupported runtime : " +
runtime);
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
index 9ef2398c55c..ad8dc475aac 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java
@@ -34,4 +34,9 @@ public abstract class PulsarFunctionsGoTest extends
PulsarFunctionsTest {
testFunctionLocalRun(Runtime.GO);
}
+ @Test(groups = {"go_function", "function"})
+ public void testGoExclamationMultiInputsFunction() throws Exception {
+ testExclamationFunction(Runtime.GO, false, false, true, false);
+ }
+
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index e6f3c67ebcd..097a452937d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -98,12 +98,12 @@ public abstract class PulsarFunctionsJavaTest extends
PulsarFunctionsTest {
@Test(groups = {"java_function", "function"})
public void testJavaExclamationFunction() throws Exception {
- testExclamationFunction(Runtime.JAVA, false, false, false);
+ testExclamationFunction(Runtime.JAVA, false, false, false, false);
}
@Test(groups = {"java_function", "function"})
public void testJavaExclamationTopicPatternFunction() throws Exception {
- testExclamationFunction(Runtime.JAVA, true, false, false);
+ testExclamationFunction(Runtime.JAVA, true, false, false, false);
}
@Test(groups = {"java_function", "function"})
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
index 1c75d704986..87a52d27e89 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
@@ -46,22 +46,22 @@ public abstract class PulsarFunctionsPythonTest extends
PulsarFunctionsTest {
@Test(groups = {"python_function", "function"})
public void testPythonExclamationFunction() throws Exception {
- testExclamationFunction(Runtime.PYTHON, false, false, false);
+ testExclamationFunction(Runtime.PYTHON, false, false, false, false);
}
@Test(groups = {"python_function", "function"})
public void testPythonExclamationFunctionWithExtraDeps() throws Exception {
- testExclamationFunction(Runtime.PYTHON, false, false, true);
+ testExclamationFunction(Runtime.PYTHON, false, false, false, true);
}
@Test(groups = {"python_function", "function"})
public void testPythonExclamationZipFunction() throws Exception {
- testExclamationFunction(Runtime.PYTHON, false, true, false);
+ testExclamationFunction(Runtime.PYTHON, false, true, false, false);
}
@Test(groups = {"python_function", "function"})
public void testPythonExclamationTopicPatternFunction() throws Exception {
- testExclamationFunction(Runtime.PYTHON, true, false, false);
+ testExclamationFunction(Runtime.PYTHON, true, false, false, false);
}
@Test(groups = {"python_function", "function"})