rdhabalia closed pull request #2259: Fix: set subscription-type based on 
message ordering
URL: https://github.com/apache/incubator-pulsar/pull/2259
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3092fd7ede..966ec6072f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -86,6 +86,7 @@
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import 
org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
 import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
@@ -240,6 +241,8 @@ void processArguments() throws Exception {
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
         @Parameter(names = "--userConfig", description = "User-defined config 
key/values")
         protected String userConfigString;
+        @Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+        protected boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
         protected Integer parallelism;
         @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
@@ -315,6 +318,9 @@ void processArguments() throws Exception {
             if (null != processingGuarantees) {
                 functionConfig.setProcessingGuarantees(processingGuarantees);
             }
+            
+            functionConfig.setRetainOrdering(retainOrdering);
+            
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, Object> userConfigMap = new 
Gson().fromJson(userConfigString, type);
@@ -560,24 +566,13 @@ protected FunctionDetails convert(FunctionConfig 
functionConfig)
                 
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
             }
 
-            // Set subscription type based on processing semantics
-            if (functionConfig.getProcessingGuarantees() != null) {
-                switch (functionConfig.getProcessingGuarantees()) {
-                    case ATMOST_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
-                        break;
-                    case ATLEAST_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
-                        break;
-                    case EFFECTIVELY_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER);
-                        break;
-                    default:
-                        throw new RuntimeException("Unknown processing 
guarantee: "
-                                + 
functionConfig.getProcessingGuarantees().name());
-                }
-            }
-
+            // Set subscription type based on ordering and EFFECTIVELY_ONCE 
semantics
+            SubscriptionType subType = (functionConfig.isRetainOrdering()
+                    || 
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
+                            ? SubscriptionType.FAILOVER
+                            : SubscriptionType.SHARED;
+            sourceSpecBuilder.setSubscriptionType(subType);
+            
             if (typeArgs != null) {
                 sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 20438770b2..8417874f38 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -57,6 +57,7 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -468,7 +469,6 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
             // set source spec
             // source spec classname should be empty so that the default 
pulsar source will be used
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
             if (sinkConfig.getTopicToSerdeClassName() != null) {
                 
sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
             }
@@ -483,9 +483,12 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
                 
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
             }
             
-            sourceSpecBuilder.setSubscriptionType(
-                    sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER 
: SubscriptionType.SHARED);
-            
+            SubscriptionType subType = (sinkConfig.isRetainOrdering()
+                    || 
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
+                            ? SubscriptionType.FAILOVER
+                            : SubscriptionType.SHARED;
+            sourceSpecBuilder.setSubscriptionType(subType);
+
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 42c829c673..c30d419608 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -86,6 +86,7 @@
     @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
+    private boolean retainOrdering;
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to