rdhabalia closed pull request #2492: add sub-name option to function cli
URL: https://github.com/apache/incubator-pulsar/pull/2492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 18d483c563..2390545b56 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,6 +24,7 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -280,6 +281,8 @@ void processArguments() throws Exception {
         protected Boolean DEPRECATED_retainOrdering;
         @Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
         protected boolean retainOrdering;
+        @Parameter(names = "--subs-name", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
+        protected String subsName;
         @Parameter(names = "--parallelism", description = "The function's 
parallelism factor (i.e. the number of function instances to run)")
         protected Integer parallelism;
         @Parameter(names = "--cpu", description = "The cpu in cores that need 
to be allocated per function instance(applicable only to docker runtime)")
@@ -402,6 +405,10 @@ void processArguments() throws Exception {
             }
 
             functionConfig.setRetainOrdering(retainOrdering);
+            
+            if (isNotBlank(subsName)) {
+                functionConfig.setSubName(subsName);
+            }
 
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
@@ -671,6 +678,10 @@ protected FunctionDetails convert(FunctionConfig 
functionConfig)
                             ? SubscriptionType.FAILOVER
                             : SubscriptionType.SHARED;
             sourceSpecBuilder.setSubscriptionType(subType);
+            
+            if (isNotBlank(functionConfig.getSubName())) {
+                
sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
+            }
 
             if (typeArgs != null) {
                 sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index ac96fa90a7..4e3095652d 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -108,6 +108,7 @@
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;
+    private String subName;
     @isPositiveNumber
     private int parallelism;
     @isValidResources
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 459afce4da..b1037f6e75 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -307,6 +307,7 @@ Options
 |`--ram`|The RAM to allocate to each function instance (in bytes)||
 |`--disk`|The disk space to allocate to each function instance (in bytes)||
 |`--auto-ack`|Let the functions framework manage acking||
+|`--subs-name`|Pulsar source subscription name if user wants a specific 
subscription-name for input-topic consumer||
 |`--broker-service-url `|The URL of the Pulsar broker||
 |`--classname`|The name of the function’s class||
 |`--custom-serde-inputs`|A map of the input topic to SerDe name||
@@ -352,6 +353,7 @@ Options
 |`--ram`|The RAM to allocate to each function instance (in bytes)||
 |`--disk`|The disk space to allocate to each function instance (in bytes)||
 |`--auto-ack`|Let the functions framework manage acking||
+|`--subs-name`|Pulsar source subscription name if user wants a specific 
subscription-name for input-topic consumer||
 |`--classname`|The name of the function’s class||
 |`--custom-serde-inputs`|A map of the input topic to SerDe name||
 |`--function-config-file`|The path of the YAML config file used to configure 
the function||
@@ -409,6 +411,7 @@ Options
 |`--ram`|The RAM to allocate to each function instance (in bytes)||
 |`--disk`|The disk space to allocate to each function instance (in bytes)||
 |`--auto-ack`|Let the functions framework manage acking||
+|`--subs-name`|Pulsar source subscription name if user wants a specific 
subscription-name for input-topic consumer||
 |`--classname`|The name of the function’s class||
 |`--custom-serde-inputs`|A map of the input topic to SerDe name||
 |`--function-config-file`|The path of the YAML config file used to configure 
the function||


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to