This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c85dc46  Fix: set subscription-type based on message ordering (#2259)
c85dc46 is described below

commit c85dc46638cbc7822872511c776777897ce1895c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 2 12:36:04 2018 -0700

    Fix: set subscription-type based on message ordering (#2259)
    
    * Fix: set subscription-type based on message ordering
    
    * set failover sub on EFFECTIVELY_ONCE processing guarantee
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 31 +++++++++-------------
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 11 +++++---
 .../pulsar/functions/utils/FunctionConfig.java     |  1 +
 3 files changed, 21 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 6948cff..9f7405c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import 
org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
 import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
@@ -240,6 +241,8 @@ public class CmdFunctions extends CmdBase {
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
         @Parameter(names = "--userConfig", description = "User-defined config 
key/values")
         protected String userConfigString;
+        @Parameter(names = "--retainOrdering", description = "Function 
consumes and processes messages in order")
+        protected boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
         protected Integer parallelism;
         @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
@@ -315,6 +318,9 @@ public class CmdFunctions extends CmdBase {
             if (null != processingGuarantees) {
                 functionConfig.setProcessingGuarantees(processingGuarantees);
             }
+            
+            functionConfig.setRetainOrdering(retainOrdering);
+            
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, Object> userConfigMap = new 
Gson().fromJson(userConfigString, type);
@@ -567,24 +573,13 @@ public class CmdFunctions extends CmdBase {
                 
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
             }
 
-            // Set subscription type based on processing semantics
-            if (functionConfig.getProcessingGuarantees() != null) {
-                switch (functionConfig.getProcessingGuarantees()) {
-                    case ATMOST_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
-                        break;
-                    case ATLEAST_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
-                        break;
-                    case EFFECTIVELY_ONCE:
-                        
sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER);
-                        break;
-                    default:
-                        throw new RuntimeException("Unknown processing 
guarantee: "
-                                + 
functionConfig.getProcessingGuarantees().name());
-                }
-            }
-
+            // Set subscription type based on ordering and EFFECTIVELY_ONCE 
semantics
+            SubscriptionType subType = (functionConfig.isRetainOrdering()
+                    || 
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
+                            ? SubscriptionType.FAILOVER
+                            : SubscriptionType.SHARED;
+            sourceSpecBuilder.setSubscriptionType(subType);
+            
             if (typeArgs != null) {
                 sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 2043877..8417874 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -468,7 +469,6 @@ public class CmdSinks extends CmdBase {
             // set source spec
             // source spec classname should be empty so that the default 
pulsar source will be used
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
             if (sinkConfig.getTopicToSerdeClassName() != null) {
                 
sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
             }
@@ -483,9 +483,12 @@ public class CmdSinks extends CmdBase {
                 
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
             }
             
-            sourceSpecBuilder.setSubscriptionType(
-                    sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER 
: SubscriptionType.SHARED);
-            
+            SubscriptionType subType = (sinkConfig.isRetainOrdering()
+                    || 
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
+                            ? SubscriptionType.FAILOVER
+                            : SubscriptionType.SHARED;
+            sourceSpecBuilder.setSubscriptionType(subType);
+
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index bcbdd72..8d91b8e 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -85,6 +85,7 @@ public class FunctionConfig {
     @isValidTopicName
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
+    private boolean retainOrdering;
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;

Reply via email to