This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 630fc0d add sub-name option to function cli (#2492) 630fc0d is described below commit 630fc0db6d4fb0af43675c8d9f6f8c93fb209552 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Aug 31 15:09:18 2018 -0700 add sub-name option to function cli (#2492) * add sub-name option to function cli * add docs --- .../main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 11 +++++++++++ .../org/apache/pulsar/functions/utils/FunctionConfig.java | 1 + site2/docs/reference-pulsar-admin.md | 3 +++ 3 files changed, 15 insertions(+) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index e567814..7f2f521 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -24,6 +24,7 @@ import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.fileExists; @@ -280,6 +281,8 @@ public class CmdFunctions extends CmdBase { protected Boolean DEPRECATED_retainOrdering; @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; + @Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") + protected String subsName; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @Parameter(names = "--cpu", description = "The cpu in cores that need to be allocated per function instance(applicable only to docker runtime)") @@ -406,6 +409,10 @@ public class CmdFunctions extends CmdBase { } functionConfig.setRetainOrdering(retainOrdering); + + if (isNotBlank(subsName)) { + functionConfig.setSubName(subsName); + } if (null != userConfigString) { Type type = new TypeToken<Map<String, String>>(){}.getType(); @@ -666,6 +673,10 @@ public class CmdFunctions extends CmdBase { ? SubscriptionType.FAILOVER : SubscriptionType.SHARED; sourceSpecBuilder.setSubscriptionType(subType); + + if (isNotBlank(functionConfig.getSubName())) { + sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName()); + } if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 21e6112..1335f8c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -107,6 +107,7 @@ public class FunctionConfig { private Map<String, Object> userConfig; private Runtime runtime; private boolean autoAck; + private String subName; @isPositiveNumber private int parallelism; @isValidResources diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index eddee17..03e1472 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -307,6 +307,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--broker-service-url `|The URL of the Pulsar broker|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| @@ -352,6 +353,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| |`--custom-schema-inputs`|A map of the input topic to Schema class name|| @@ -409,6 +411,7 @@ Options |`--ram`|The RAM to allocate to each function instance (in bytes)|| |`--disk`|The disk space to allocate to each function instance (in bytes)|| |`--auto-ack`|Let the functions framework manage acking|| +|`--subs-name`|Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer|| |`--classname`|The name of the function’s class|| |`--custom-serde-inputs`|A map of the input topic to SerDe name|| |`--custom-schema-inputs`|A map of the input topic to Schema class name||