This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d5d40a2068485c341a72f223e7f1db76d1bedd30 Author: Rui Fu <[email protected]> AuthorDate: Fri Sep 24 10:27:40 2021 +0800 [pulsar-functions-go] support set subscription position (#11990) * stash * set default value (cherry picked from commit 652d1546569bd7bc2eabb0c258d7e5dc038bb334) --- pulsar-function-go/conf/conf.go | 9 +++++---- pulsar-function-go/conf/conf.yaml | 1 + pulsar-function-go/pf/instanceConf.go | 7 ++++--- pulsar-function-go/pf/instanceConf_test.go | 7 ++++--- .../apache/pulsar/functions/instance/go/GoInstanceConfig.java | 2 ++ .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java | 2 ++ 6 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go index c0d1262..dfbf542 100644 --- a/pulsar-function-go/conf/conf.go +++ b/pulsar-function-go/conf/conf.go @@ -52,10 +52,11 @@ type Conf struct { AutoACK bool `json:"autoAck" yaml:"autoAck"` Parallelism int32 `json:"parallelism" yaml:"parallelism"` //source config - SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"` - TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"` - SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"` - CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"` + SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"` + TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"` + SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"` + CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"` + SubscriptionPosition int32 `json:"subscriptionPosition" yaml:"subscriptionPosition"` //source input specs SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"` SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"` diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml index 7809ab5..59ac9bb 100644 --- a/pulsar-function-go/conf/conf.yaml +++ b/pulsar-function-go/conf/conf.yaml @@ -41,6 +41,7 @@ subscriptionType: 0 timeoutMs: 0 subscriptionName: "" cleanupSubscription: false +subscriptionPosition: 1 # source input specs sourceSpecsTopic: persistent://public/default/topic-01 sourceSchemaType: "" diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 1227024..2fc8d95 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf { }, }, }, - TimeoutMs: cfg.TimeoutMs, - SubscriptionName: cfg.SubscriptionName, - CleanupSubscription: cfg.CleanupSubscription, + TimeoutMs: cfg.TimeoutMs, + SubscriptionName: cfg.SubscriptionName, + CleanupSubscription: cfg.CleanupSubscription, + SubscriptionPosition: pb.SubscriptionPosition(cfg.SubscriptionPosition), }, Sink: &pb.SinkSpec{ Topic: cfg.SinkSpecTopic, diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go index be93239..fa87002 100644 --- a/pulsar-function-go/pf/instanceConf_test.go +++ b/pulsar-function-go/pf/instanceConf_test.go @@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) { }, }, }, - TimeoutMs: 0, - SubscriptionName: "", - CleanupSubscription: false, + TimeoutMs: 0, + SubscriptionName: "", + CleanupSubscription: false, + SubscriptionPosition: pb.SubscriptionPosition_EARLIEST, }, Sink: &pb.SinkSpec{ Topic: "persistent://public/default/topic-02", diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java index 0cb1de2..85e73d3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.go; import lombok.Getter; import lombok.Setter; +import org.apache.pulsar.functions.proto.Function; @Setter @Getter @@ -50,6 +51,7 @@ public class GoInstanceConfig { private long timeoutMs; private String subscriptionName = ""; private boolean cleanupSubscription; + private int subscriptionPosition = Function.SubscriptionPosition.LATEST.getNumber(); private String sourceSpecsTopic = ""; private String sourceSchemaType = ""; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 592cd55..107d5cf 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -196,6 +196,8 @@ public class RuntimeUtils { if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) { goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName()); } + goInstanceConfig.setSubscriptionPosition( + instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber()); if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) { for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
