rdhabalia closed pull request #2200: support subscription-type to be passed in 
sink-function
URL: https://github.com/apache/incubator-pulsar/pull/2200
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 83a5b57837..20438770b2 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -55,6 +55,7 @@
 import org.apache.pulsar.functions.proto.Function.Resources;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
@@ -212,6 +213,8 @@ void runCmd() throws Exception {
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+        @Parameter(names = "--retainOrdering", description = "Sink consumes 
and sinks messages in order")
+        protected boolean retainOrdering;
         @Parameter(names = "--parallelism", description = "The sink's 
parallelism factor (i.e. the number of sink instances to run)")
         protected Integer parallelism;
         @Parameter(names = {"-a", "--archive"}, description = "Path to the 
archive file for the sink. It also supports url-path [http/https/file (file 
protocol assumes that file already exists on worker host)] from which worker 
can download the package.", listConverter = StringConverter.class)
@@ -261,6 +264,8 @@ void processArguments() throws Exception {
             if (null != processingGuarantees) {
                 sinkConfig.setProcessingGuarantees(processingGuarantees);
             }
+            
+            sinkConfig.setRetainOrdering(retainOrdering);
 
             Map<String, String> topicsToSerDeClassName = new HashMap<>();
             if (null != inputs) {
@@ -477,6 +482,10 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
             if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
                 
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
             }
+            
+            sourceSpecBuilder.setSubscriptionType(
+                    sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER 
: SubscriptionType.SHARED);
+            
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 3140f9535c..42c829c673 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -55,7 +55,7 @@
         ATMOST_ONCE,
         EFFECTIVELY_ONCE
     }
-
+    
     public enum Runtime {
         JAVA,
         PYTHON
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 34149735ef..7e97135131 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -32,7 +32,6 @@
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
 import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
 
-import java.util.HashMap;
 import java.util.Map;
 
 @Getter
@@ -60,6 +59,7 @@
     @isPositiveNumber
     private int parallelism = 1;
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private boolean retainOrdering;
     @isValidResources
     private Resources resources;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to