rdhabalia closed pull request #2198: Add support to configure subscription name for sink-function URL: https://github.com/apache/incubator-pulsar/pull/2198
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 3ece2694fa..78fd2e2e40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -249,6 +249,7 @@ public void testE2EPulsarSink() throws Exception { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -260,7 +261,7 @@ public void testE2EPulsarSink() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic); + sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -283,8 +284,7 @@ public void testE2EPulsarSink() throws Exception { } retryStrategically((test) -> { try { - SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() - .next(); + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -314,6 +314,7 @@ public void testPulsarSinkStats() throws Exception { final String propertyKey = "key"; final String propertyValue = "value"; final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; admin.namespaces().createNamespace(replNamespace); Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); @@ -324,7 +325,7 @@ public void testPulsarSinkStats() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic); + sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -347,8 +348,7 @@ public void testPulsarSinkStats() throws Exception { } retryStrategically((test) -> { try { - SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.values().iterator() - .next(); + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); return subStats.unackedMessages == 0; } catch (PulsarAdminException e) { return false; @@ -372,7 +372,7 @@ public void testPulsarSinkStats() throws Exception { Assert.assertEquals((int) success, totalMsgs); } - protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic) { + protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { File file = new File(jarFile); try { @@ -397,6 +397,7 @@ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); sourceSpecBuilder.setTypeClassName(byte[].class.getName()); sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); + sourceSpecBuilder.setSubscriptionName(subscriptionName); sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, DefaultSerDe.class.getName()); functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 31f3e0a1b1..83a5b57837 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -39,7 +39,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -61,17 +62,6 @@ import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; @@ -216,6 +206,8 @@ void runCmd() throws Exception { protected String inputs; @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") protected String topicsPattern; + @Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") + protected String subsName; @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)") protected String customSerdeInputString; @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink") @@ -282,6 +274,10 @@ void processArguments() throws Exception { sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName); } + if (isNotBlank(subsName)) { + sinkConfig.setSourceSubscriptionName(subsName); + } + if (null != topicsPattern) { sinkConfig.setTopicsPattern(topicsPattern); } @@ -434,7 +430,7 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep if (!isBuiltin) { if (sinkConfig.getArchive().startsWith(Utils.FILE)) { - if (StringUtils.isBlank(sinkConfig.getClassName())) { + if (isBlank(sinkConfig.getClassName())) { throw new ParameterException("Class-name must be present for archive with file-url"); } sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class @@ -478,6 +474,9 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) throws IOExcep if (typeArg != null) { sourceSpecBuilder.setTypeClassName(typeArg); } + if (isNotBlank(sinkConfig.getSourceSubscriptionName())) { + sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName()); + } functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); @@ -572,7 +571,7 @@ void runCmd() throws Exception { public class ListSinks extends BaseCommand { @Override void runCmd() throws Exception { - admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSinkClass())) + admin.functions().getConnectorsList().stream().filter(x -> isNotBlank(x.getSinkClass())) .forEach(connector -> { System.out.println(connector.getName()); System.out.println(WordUtils.wrap(connector.getDescription(), 80)); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index b479852b8c..b64224fdb5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; @@ -506,7 +507,8 @@ public void setupInput(ContextImpl contextImpl) throws Exception { pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap()); pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern()); pulsarSourceConfig.setSubscriptionName( - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() + : FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); pulsarSourceConfig.setProcessingGuarantees( FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 01b3660b6e..92f14f6f19 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -75,6 +75,7 @@ message SourceSpec { /* If specified, this will refer to an archive that is * already present in the server */ string builtin = 8; + string subscriptionName = 9; } message SinkSpec { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java index d97aab2331..34149735ef 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java @@ -49,6 +49,7 @@ @NotNull private String name; private String className; + private String sourceSubscriptionName; @isMapEntryCustom(keyValidatorClasses = { ValidatorImpls.TopicNameValidator.class }, valueValidatorClasses = { ValidatorImpls.SerdeValidator.class }) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services