This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5d40a2068485c341a72f223e7f1db76d1bedd30
Author: Rui Fu <[email protected]>
AuthorDate: Fri Sep 24 10:27:40 2021 +0800

    [pulsar-functions-go] support set subscription position (#11990)
    
    * stash
    
    * set default value
    
    (cherry picked from commit 652d1546569bd7bc2eabb0c258d7e5dc038bb334)
---
 pulsar-function-go/conf/conf.go                                  | 9 +++++----
 pulsar-function-go/conf/conf.yaml                                | 1 +
 pulsar-function-go/pf/instanceConf.go                            | 7 ++++---
 pulsar-function-go/pf/instanceConf_test.go                       | 7 ++++---
 .../apache/pulsar/functions/instance/go/GoInstanceConfig.java    | 2 ++
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java   | 2 ++
 6 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c0d1262..dfbf542 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -52,10 +52,11 @@ type Conf struct {
        AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
        Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
        //source config
-       SubscriptionType    int32  `json:"subscriptionType" 
yaml:"subscriptionType"`
-       TimeoutMs           uint64 `json:"timeoutMs" yaml:"timeoutMs"`
-       SubscriptionName    string `json:"subscriptionName" 
yaml:"subscriptionName"`
-       CleanupSubscription bool   `json:"cleanupSubscription"  
yaml:"cleanupSubscription"`
+       SubscriptionType     int32  `json:"subscriptionType" 
yaml:"subscriptionType"`
+       TimeoutMs            uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+       SubscriptionName     string `json:"subscriptionName" 
yaml:"subscriptionName"`
+       CleanupSubscription  bool   `json:"cleanupSubscription"  
yaml:"cleanupSubscription"`
+       SubscriptionPosition int32  `json:"subscriptionPosition" 
yaml:"subscriptionPosition"`
        //source input specs
        SourceSpecTopic            string `json:"sourceSpecsTopic" 
yaml:"sourceSpecsTopic"`
        SourceSchemaType           string `json:"sourceSchemaType" 
yaml:"sourceSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml 
b/pulsar-function-go/conf/conf.yaml
index 7809ab5..59ac9bb 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -41,6 +41,7 @@ subscriptionType: 0
 timeoutMs: 0
 subscriptionName: ""
 cleanupSubscription: false
+subscriptionPosition: 1
 # source input specs
 sourceSpecsTopic: persistent://public/default/topic-01
 sourceSchemaType: ""
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index 1227024..2fc8d95 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf {
                                                },
                                        },
                                },
-                               TimeoutMs:           cfg.TimeoutMs,
-                               SubscriptionName:    cfg.SubscriptionName,
-                               CleanupSubscription: cfg.CleanupSubscription,
+                               TimeoutMs:            cfg.TimeoutMs,
+                               SubscriptionName:     cfg.SubscriptionName,
+                               CleanupSubscription:  cfg.CleanupSubscription,
+                               SubscriptionPosition: 
pb.SubscriptionPosition(cfg.SubscriptionPosition),
                        },
                        Sink: &pb.SinkSpec{
                                Topic:      cfg.SinkSpecTopic,
diff --git a/pulsar-function-go/pf/instanceConf_test.go 
b/pulsar-function-go/pf/instanceConf_test.go
index be93239..fa87002 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) {
                                                        },
                                                },
                                        },
-                                       TimeoutMs:           0,
-                                       SubscriptionName:    "",
-                                       CleanupSubscription: false,
+                                       TimeoutMs:            0,
+                                       SubscriptionName:     "",
+                                       CleanupSubscription:  false,
+                                       SubscriptionPosition: 
pb.SubscriptionPosition_EARLIEST,
                                },
                                Sink: &pb.SinkSpec{
                                        Topic:      
"persistent://public/default/topic-02",
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 0cb1de2..85e73d3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.go;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.pulsar.functions.proto.Function;
 
 @Setter
 @Getter
@@ -50,6 +51,7 @@ public class GoInstanceConfig {
     private long timeoutMs;
     private String subscriptionName = "";
     private boolean cleanupSubscription;
+    private int subscriptionPosition = 
Function.SubscriptionPosition.LATEST.getNumber();
 
     private String sourceSpecsTopic = "";
     private String sourceSchemaType = "";
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 592cd55..107d5cf 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -196,6 +196,8 @@ public class RuntimeUtils {
         if 
(instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) 
{
             
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
         }
+        goInstanceConfig.setSubscriptionPosition(
+                
instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
 
         if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() 
!= null) {
             for (String inputTopic : 
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {

Reply via email to