rdhabalia closed pull request #2492: add sub-name option to function cli URL: https://github.com/apache/incubator-pulsar/pull/2492
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 18d483c563..2390545b56 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.fileExists; @@ -280,6 +281,8 @@ void processArguments() throws Exception { protected Boolean DEPRECATED_retainOrdering; @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; + @Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") + protected String subsName; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") @@ -402,6 +405,10 @@ void processArguments() throws Exception { } functionConfig.setRetainOrdering(retainOrdering); + + if (isNotBlank(subsName)) { + functionConfig.setSubName(subsName); + } if (null != userConfigString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); @@ -671,6 +678,10 @@ protected FunctionDetails convert(FunctionConfig functionConfig) ? SubscriptionType.FAILOVER : SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); + + if (isNotBlank(functionConfig.getSubName())) { + sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); + } if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index ac96fa90a7..4e3095652d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -108,6 +108,7 @@ private Map<String, Object> userConfig; private Runtime runtime; private boolean autoAck; + private String subName; @isPositiveNumber private int parallelism; @isValidResources diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 459afce4da..b1037f6e75 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -307,6 +307,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--broker-service-url `|The URL of the Pulsar broker|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| @@ -352,6 +353,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| |`--function-config-file`|The path of the YAML config file used to configure the function|| @@ -409,6 +411,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| |`--function-config-file`|The path of the YAML config file used to configure the function|| ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services