rdhabalia closed pull request #2198: Add support to configure subscription name 
for sink-function
URL: https://github.com/apache/incubator-pulsar/pull/2198
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 3ece2694fa..78fd2e2e40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -249,6 +249,7 @@ public void testE2EPulsarSink() throws Exception {
         final String propertyKey = "key";
         final String propertyValue = "value";
         final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
@@ -260,7 +261,7 @@ public void testE2EPulsarSink() throws Exception {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic);
+                sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -283,8 +284,7 @@ public void testE2EPulsarSink() throws Exception {
         }
         retryStrategically((test) -> {
             try {
-                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
-                        .next();
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
                 return subStats.unackedMessages == 0;
             } catch (PulsarAdminException e) {
                 return false;
@@ -314,6 +314,7 @@ public void testPulsarSinkStats() throws Exception {
         final String propertyKey = "key";
         final String propertyValue = "value";
         final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
@@ -324,7 +325,7 @@ public void testPulsarSinkStats() throws Exception {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic);
+                sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -347,8 +348,7 @@ public void testPulsarSinkStats() throws Exception {
         }
         retryStrategically((test) -> {
             try {
-                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
-                        .next();
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
                 return subStats.unackedMessages == 0;
             } catch (PulsarAdminException e) {
                 return false;
@@ -372,7 +372,7 @@ public void testPulsarSinkStats() throws Exception {
         Assert.assertEquals((int) success, totalMsgs);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic) {
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -397,6 +397,7 @@ protected FunctionDetails createSinkConfig(String jarFile, 
String tenant, String
         
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
         sourceSpecBuilder.setTypeClassName(byte[].class.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
+        sourceSpecBuilder.setSubscriptionName(subscriptionName);
         sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, 
DefaultSerDe.class.getName());
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 31f3e0a1b1..83a5b57837 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -39,7 +39,8 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -61,17 +62,6 @@
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -216,6 +206,8 @@ void runCmd() throws Exception {
         protected String inputs;
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
         protected String topicsPattern;
+        @Parameter(names = "--subsName", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
+        protected String subsName;
         @Parameter(names = "--customSerdeInputs", description = "The map of 
input topics to SerDe class names (as a JSON string)")
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
@@ -282,6 +274,10 @@ void processArguments() throws Exception {
                 sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
             }
             
+            if (isNotBlank(subsName)) {
+                sinkConfig.setSourceSubscriptionName(subsName);
+            }
+            
             if (null != topicsPattern) {
                 sinkConfig.setTopicsPattern(topicsPattern);
             }
@@ -434,7 +430,7 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
 
             if (!isBuiltin) {
                 if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
-                    if (StringUtils.isBlank(sinkConfig.getClassName())) {
+                    if (isBlank(sinkConfig.getClassName())) {
                         throw new ParameterException("Class-name must be 
present for archive with file-url");
                     }
                     sinkClassName = sinkConfig.getClassName(); // server 
derives the arg-type by loading a class
@@ -478,6 +474,9 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) throws IOExcep
             if (typeArg != null) {
                 sourceSpecBuilder.setTypeClassName(typeArg);
             }
+            if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
+                
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
+            }
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
@@ -572,7 +571,7 @@ void runCmd() throws Exception {
     public class ListSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSinkClass()))
+            admin.functions().getConnectorsList().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b479852b8c..b64224fdb5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -43,6 +43,7 @@
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
@@ -506,7 +507,8 @@ public void setupInput(ContextImpl contextImpl) throws 
Exception {
             
pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
             pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern());
             pulsarSourceConfig.setSubscriptionName(
-                    
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+                    StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? 
sourceSpec.getSubscriptionName()
+                            : 
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
             pulsarSourceConfig.setProcessingGuarantees(
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 01b3660b6e..92f14f6f19 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -75,6 +75,7 @@ message SourceSpec {
     /* If specified, this will refer to an archive that is
      * already present in the server */
     string builtin = 8;
+    string subscriptionName = 9;
 }
 
 message SinkSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index d97aab2331..34149735ef 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -49,6 +49,7 @@
     @NotNull
     private String name;
     private String className;
+    private String sourceSubscriptionName;
 
     @isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
             valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to