This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e28ef1b  Add support to configure subscription name for sink-function 
(#2198)
e28ef1b is described below

commit e28ef1b4f7b6cb44cb424b7f121085fd5c1a87d9
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Wed Jul 18 17:41:30 2018 -0700

    Add support to configure subscription name for sink-function (#2198)
---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 15 ++++++------
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 27 +++++++++++-----------
 .../functions/instance/JavaInstanceRunnable.java   |  4 +++-
 .../proto/src/main/proto/Function.proto            |  1 +
 .../apache/pulsar/functions/utils/SinkConfig.java  |  1 +
 5 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 3ece269..78fd2e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -249,6 +249,7 @@ public class PulsarSinkE2ETest {
         final String propertyKey = "key";
         final String propertyValue = "value";
         final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
@@ -260,7 +261,7 @@ public class PulsarSinkE2ETest {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic);
+                sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -283,8 +284,7 @@ public class PulsarSinkE2ETest {
         }
         retryStrategically((test) -> {
             try {
-                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
-                        .next();
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
                 return subStats.unackedMessages == 0;
             } catch (PulsarAdminException e) {
                 return false;
@@ -314,6 +314,7 @@ public class PulsarSinkE2ETest {
         final String propertyKey = "key";
         final String propertyValue = "value";
         final String functionName = "PulsarSink-test";
+        final String subscriptionName = "test-sub";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
@@ -324,7 +325,7 @@ public class PulsarSinkE2ETest {
         String jarFilePathUrl = Utils.FILE + ":"
                 + 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, 
tenant, namespacePortion, functionName,
-                sinkTopic);
+                sinkTopic, subscriptionName);
         admin.functions().createFunctionWithUrl(functionDetails, 
jarFilePathUrl);
 
         // try to update function to test: update-function functionality
@@ -347,8 +348,7 @@ public class PulsarSinkE2ETest {
         }
         retryStrategically((test) -> {
             try {
-                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
-                        .next();
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
                 return subStats.unackedMessages == 0;
             } catch (PulsarAdminException e) {
                 return false;
@@ -372,7 +372,7 @@ public class PulsarSinkE2ETest {
         Assert.assertEquals((int) success, totalMsgs);
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic) {
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, 
String namespace, String functionName, String sinkTopic, String 
subscriptionName) {
 
         File file = new File(jarFile);
         try {
@@ -397,6 +397,7 @@ public class PulsarSinkE2ETest {
         
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
         sourceSpecBuilder.setTypeClassName(byte[].class.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
+        sourceSpecBuilder.setSubscriptionName(subscriptionName);
         sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, 
DefaultSerDe.class.getName());
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 31f3e0a..83a5b57 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -39,7 +39,8 @@ import java.util.Set;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -61,17 +62,6 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -216,6 +206,8 @@ public class CmdSinks extends CmdBase {
         protected String inputs;
         @Parameter(names = "--topicsPattern", description = "TopicsPattern to 
consume from list of topics under a namespace that match the pattern. [--input] 
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a 
pattern in --customSerdeInputs  (supported for java fun only)")
         protected String topicsPattern;
+        @Parameter(names = "--subsName", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
+        protected String subsName;
         @Parameter(names = "--customSerdeInputs", description = "The map of 
input topics to SerDe class names (as a JSON string)")
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the sink")
@@ -282,6 +274,10 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
             }
             
+            if (isNotBlank(subsName)) {
+                sinkConfig.setSourceSubscriptionName(subsName);
+            }
+            
             if (null != topicsPattern) {
                 sinkConfig.setTopicsPattern(topicsPattern);
             }
@@ -434,7 +430,7 @@ public class CmdSinks extends CmdBase {
 
             if (!isBuiltin) {
                 if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
-                    if (StringUtils.isBlank(sinkConfig.getClassName())) {
+                    if (isBlank(sinkConfig.getClassName())) {
                         throw new ParameterException("Class-name must be 
present for archive with file-url");
                     }
                     sinkClassName = sinkConfig.getClassName(); // server 
derives the arg-type by loading a class
@@ -478,6 +474,9 @@ public class CmdSinks extends CmdBase {
             if (typeArg != null) {
                 sourceSpecBuilder.setTypeClassName(typeArg);
             }
+            if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
+                
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
+            }
             functionDetailsBuilder.setAutoAck(true);
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
@@ -572,7 +571,7 @@ public class CmdSinks extends CmdBase {
     public class ListSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.functions().getConnectorsList().stream().filter(x -> 
!StringUtils.isEmpty(x.getSinkClass()))
+            admin.functions().getConnectorsList().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b479852..b64224f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -43,6 +43,7 @@ import 
org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
@@ -506,7 +507,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             
pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
             pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern());
             pulsarSourceConfig.setSubscriptionName(
-                    
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+                    StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? 
sourceSpec.getSubscriptionName()
+                            : 
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
             pulsarSourceConfig.setProcessingGuarantees(
                     FunctionConfig.ProcessingGuarantees.valueOf(
                             
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 01b3660..92f14f6 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -75,6 +75,7 @@ message SourceSpec {
     /* If specified, this will refer to an archive that is
      * already present in the server */
     string builtin = 8;
+    string subscriptionName = 9;
 }
 
 message SinkSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index d97aab2..3414973 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -49,6 +49,7 @@ public class SinkConfig {
     @NotNull
     private String name;
     private String className;
+    private String sourceSubscriptionName;
 
     @isMapEntryCustom(keyValidatorClasses = { 
ValidatorImpls.TopicNameValidator.class },
             valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })

Reply via email to