This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba5c2d  support subscription-type to be passed in sink-function 
(#2200)
7ba5c2d is described below

commit 7ba5c2d4f6048762413f6cc1faab3dbf3210a08f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Jul 19 23:48:26 2018 -0700

    support subscription-type to be passed in sink-function (#2200)
    
    * support subscription-type to be passed in sink-function
    
    * Fix: test
    
    * retain-ordering
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java      | 9 +++++++++
 .../java/org/apache/pulsar/functions/utils/FunctionConfig.java   | 2 +-
 .../main/java/org/apache/pulsar/functions/utils/SinkConfig.java  | 2 +-
 3 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 83a5b57..2043877 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -55,6 +55,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
@@ -212,6 +213,8 @@ public class CmdSinks extends CmdBase {
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+        @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order")
+        protected boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The sink's 
parallelism factor (i.e. the number of sink instances to run)")
         protected Integer parallelism;
         @Parameter(names = {"-a", "--archive"}, description = "Path to the 
archive file for the sink. It also supports url-path [http/https/file (file 
protocol assumes that file already exists on worker host)] from which worker 
can download the package.", listConverter = StringConverter.class)
@@ -261,6 +264,8 @@ public class CmdSinks extends CmdBase {
             if (null != processingGuarantees) {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
+            
+            sinkConfig.setRetainOrdering(retainOrdering);
 
             Map<String, String> topicsToSerDeClassName = new HashMap<>();
             if (null != inputs) {
@@ -477,6 +482,10 @@ public class CmdSinks extends CmdBase {
             if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
                 
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
             }
+            
+            sourceSpecBuilder.setSubscriptionType(
+                    sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER 
: SubscriptionType.SHARED);
+            
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 3140f95..42c829c 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -55,7 +55,7 @@ public class FunctionConfig {
         ATMOST_ONCE,
         EFFECTIVELY_ONCE
     }
-
+    
     public enum Runtime {
         JAVA,
         PYTHON
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 3414973..7e97135 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -32,7 +32,6 @@ import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
 import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 
-import java.util.HashMap;
 import java.util.Map;
 
 @Getter
@@ -60,6 +59,7 @@ public class SinkConfig {
     @isPositiveNumber
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private boolean retainOrdering;
     @isValidResources
     private Resources resources;
 

Reply via email to