This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e28ef1b Add support to configure subscription name for sink-function (#2198) e28ef1b is described below commit e28ef1b4f7b6cb44cb424b7f121085fd5c1a87d9 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Jul 18 17:41:30 2018 -0700 Add support to configure subscription name for sink-function (#2198) --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 15 ++++++------ .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 27 +++++++++++----------- .../functions/instance/JavaInstanceRunnable.java | 4 +++- .../proto/src/main/proto/Function.proto | 1 + .../apache/pulsar/functions/utils/SinkConfig.java | 1 + 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 3ece269..78fd2e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -249,6 +249,7 @@ public class PulsarSinkE2ETest { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -260,7 +261,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic); + sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -283,8 +284,7 @@ public class PulsarSinkE2ETest { } retryStrategically((test) -> { try { - SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() - .next(); + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -314,6 +314,7 @@ public class PulsarSinkE2ETest { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -324,7 +325,7 @@ public class PulsarSinkE2ETest { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic); + sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -347,8 +348,7 @@ public class PulsarSinkE2ETest { } retryStrategically((test) -> { try { - SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() - .next(); + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -372,7 +372,7 @@ public class PulsarSinkE2ETest { Assert.assertEquals((int) success, totalMsgs); } - protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic) { + protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { File file = new File(jarFile); try { @@ -397,6 +397,7 @@ public class PulsarSinkE2ETest { sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); sourceSpecBuilder.setTypeClassName(byte[].class.getName()); sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); + sourceSpecBuilder.setSubscriptionName(subscriptionName); sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, DefaultSerDe.class.getName()); functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 31f3e0a..83a5b57 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -39,7 +39,8 @@ import java.util.Set; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -61,17 +62,6 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils; import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; @@ -216,6 +206,8 @@ public class CmdSinks extends CmdBase { protected String inputs; @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") protected String topicsPattern; + @Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") + protected String subsName; @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)") protected String customSerdeInputString; @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink") @@ -282,6 +274,10 @@ public class CmdSinks extends CmdBase { sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); } + if (isNotBlank(subsName)) { + sinkConfig.setSourceSubscriptionName(subsName); + } + if (null != topicsPattern) { sinkConfig.setTopicsPattern(topicsPattern); } @@ -434,7 +430,7 @@ public class CmdSinks extends CmdBase { if (!isBuiltin) { if (sinkConfig.getArchive().startsWith(Utils.FILE)) { - if (StringUtils.isBlank(sinkConfig.getClassName())) { + if (isBlank(sinkConfig.getClassName())) { throw new ParameterException("Class-name must be present for archive with file-url"); } sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class @@ -478,6 +474,9 @@ public class CmdSinks extends CmdBase { if (typeArg != null) { sourceSpecBuilder.setTypeClassName(typeArg); } + if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { + sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); + } functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); @@ -572,7 +571,7 @@ public class CmdSinks extends CmdBase { public class ListSinks extends BaseCommand { @Override void runCmd() throws Exception { - admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSinkClass())) + admin.functions().getConnectorsList().stream().filter(x -> isNotBlank(x.getSinkClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80)); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index b479852..b64224f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; @@ -506,7 +507,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap()); pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern()); pulsarSourceConfig.setSubscriptionName( - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() + : FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); pulsarSourceConfig.setProcessingGuarantees( FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 01b3660..92f14f6 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -75,6 +75,7 @@ message SourceSpec { /* If specified, this will refer to an archive that is * already present in the server */ string builtin = 8; + string subscriptionName = 9; } message SinkSpec { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index d97aab2..3414973 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -49,6 +49,7 @@ public class SinkConfig { @NotNull private String name; private String className; + private String sourceSubscriptionName; @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })