This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dbddfe5282fa5c0d0ff5ec85db624672e4c9a603
Author: Rui Fu <[email protected]>
AuthorDate: Wed Sep 15 08:53:56 2021 +0800

    [pulsar-functions] Pass `SubscriptionPosition` from `FunctionDetails` to 
`FunctionConfig` / `SinkConfig` (#11831)
    
    * pass SubscriptionPosition from FunctionDetails to config
    
    * address comment
    
    * reduce code duplication
    
    * set subscriptionPosition in FunctionConfig, SinkConfig with init value
    
    * fix default values
    
    * fix CI
    
    * revert init data
    
    * fix CI
    
    * fix CI
    
    * fix CI
    
    (cherry picked from commit bfd6542ac17c24ff38134a843b3f372cad9246e8)
---
 .../pulsar/common/functions/FunctionConfig.java     |  3 ++-
 .../org/apache/pulsar/common/io/SinkConfig.java     |  3 ++-
 .../functions/instance/JavaInstanceRunnable.java    | 12 ++++--------
 .../pulsar/functions/utils/FunctionCommon.java      | 10 ++++++++++
 .../pulsar/functions/utils/FunctionConfigUtils.java | 21 ++++++++++++++-------
 .../pulsar/functions/utils/SinkConfigUtils.java     |  6 ++++++
 6 files changed, 38 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 527f9b8..0fe539f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -126,5 +126,6 @@ public class FunctionConfig {
     // Whether the pulsar admin client exposed to function context, default is 
disabled.
     private Boolean exposePulsarAdminClientEnabled;
 
-    private SubscriptionInitialPosition subscriptionPosition;
+    @Builder.Default
+    private SubscriptionInitialPosition subscriptionPosition = 
SubscriptionInitialPosition.Latest;
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index 4a0d093..9bf171a 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -43,7 +43,8 @@ public class SinkConfig {
     private String name;
     private String className;
     private String sourceSubscriptionName;
-    private SubscriptionInitialPosition sourceSubscriptionPosition;
+    @Builder.Default
+    private SubscriptionInitialPosition sourceSubscriptionPosition = 
SubscriptionInitialPosition.Latest;
 
     private Collection<String> inputs;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 692902e..f6ca84d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.instance;
 
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 
@@ -669,14 +670,9 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
 
-            switch (sourceSpec.getSubscriptionPosition()) {
-                case EARLIEST:
-                    
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
-                    break;
-                default:
-                    
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
-                    break;
-            }
+            pulsarSourceConfig.setSubscriptionPosition(
+                    
convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition())
+            );
 
             Preconditions.checkNotNull(contextImpl.getSubscriptionType());
             
pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 2e433ad..a13695e 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -501,4 +502,13 @@ public class FunctionCommon {
 
         return false;
     }
+
+    public static SubscriptionInitialPosition 
convertFromFunctionDetailsSubscriptionPosition(
+            org.apache.pulsar.functions.proto.Function.SubscriptionPosition 
subscriptionPosition) {
+        if 
(org.apache.pulsar.functions.proto.Function.SubscriptionPosition.EARLIEST.equals(subscriptionPosition))
 {
+            return SubscriptionInitialPosition.Earliest;
+        } else {
+            return SubscriptionInitialPosition.Latest;
+        }
+    }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 13089e4..a2579b7 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -47,6 +47,7 @@ import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.functions.Utils.BUILTIN;
 import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 
 @Slf4j
 public class FunctionConfigUtils {
@@ -158,15 +159,16 @@ public class FunctionConfigUtils {
         }
 
         // Set subscription position
-        Function.SubscriptionPosition subPosition;
-        if (functionConfig.getSubscriptionPosition() == 
SubscriptionInitialPosition.Earliest) {
-            subPosition = Function.SubscriptionPosition.EARLIEST;
-        } else {
-            subPosition = Function.SubscriptionPosition.LATEST;
+        if (functionConfig.getSubscriptionPosition() != null) {
+            Function.SubscriptionPosition subPosition = null;
+            if (SubscriptionInitialPosition.Earliest == 
functionConfig.getSubscriptionPosition()) {
+                subPosition = Function.SubscriptionPosition.EARLIEST;
+            } else {
+                subPosition = Function.SubscriptionPosition.LATEST;
+            }
+            sourceSpecBuilder.setSubscriptionPosition(subPosition);
         }
 
-        sourceSpecBuilder.setSubscriptionPosition(subPosition);
-
         if (typeArgs != null) {
             sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
         }
@@ -367,6 +369,11 @@ public class FunctionConfigUtils {
 
         
functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
         functionConfig.setAutoAck(functionDetails.getAutoAck());
+
+        // Set subscription position
+        functionConfig.setSubscriptionPosition(
+                
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
+
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
         }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index fd6f35c..a8180ab 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -55,6 +55,7 @@ import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 import static 
org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
 
@@ -284,6 +285,11 @@ public class SinkConfigUtils {
             
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         }
         sinkConfig.setAutoAck(functionDetails.getAutoAck());
+
+        // Set subscription position
+        sinkConfig.setSourceSubscriptionPosition(
+                
convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition()));
+
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
         }

Reply via email to