This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e28ef1b Add support to configure subscription name for sink-function
(#2198)
e28ef1b is described below
commit e28ef1b4f7b6cb44cb424b7f121085fd5c1a87d9
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Jul 18 17:41:30 2018 -0700
Add support to configure subscription name for sink-function (#2198)
---
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 15 ++++++------
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 27 +++++++++++-----------
.../functions/instance/JavaInstanceRunnable.java | 4 +++-
.../proto/src/main/proto/Function.proto | 1 +
.../apache/pulsar/functions/utils/SinkConfig.java | 1 +
5 files changed, 26 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 3ece269..78fd2e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -249,6 +249,7 @@ public class PulsarSinkE2ETest {
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
+ final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
@@ -260,7 +261,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
- sinkTopic);
+ sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
// try to update function to test: update-function functionality
@@ -283,8 +284,7 @@ public class PulsarSinkE2ETest {
}
retryStrategically((test) -> {
try {
- SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
- .next();
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0;
} catch (PulsarAdminException e) {
return false;
@@ -314,6 +314,7 @@ public class PulsarSinkE2ETest {
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
+ final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
@@ -324,7 +325,7 @@ public class PulsarSinkE2ETest {
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, functionName,
- sinkTopic);
+ sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
// try to update function to test: update-function functionality
@@ -347,8 +348,7 @@ public class PulsarSinkE2ETest {
}
retryStrategically((test) -> {
try {
- SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.values().iterator()
- .next();
+ SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0;
} catch (PulsarAdminException e) {
return false;
@@ -372,7 +372,7 @@ public class PulsarSinkE2ETest {
Assert.assertEquals((int) success, totalMsgs);
}
- protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic) {
+ protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String functionName, String sinkTopic, String
subscriptionName) {
File file = new File(jarFile);
try {
@@ -397,6 +397,7 @@ public class PulsarSinkE2ETest {
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
sourceSpecBuilder.setTypeClassName(byte[].class.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
+ sourceSpecBuilder.setSubscriptionName(subscriptionName);
sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern,
DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 31f3e0a..83a5b57 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -39,7 +39,8 @@ import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -61,17 +62,6 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -216,6 +206,8 @@ public class CmdSinks extends CmdBase {
protected String inputs;
@Parameter(names = "--topicsPattern", description = "TopicsPattern to
consume from list of topics under a namespace that match the pattern. [--input]
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a
pattern in --customSerdeInputs (supported for java fun only)")
protected String topicsPattern;
+ @Parameter(names = "--subsName", description = "Pulsar source
subscription name if user wants a specific subscription-name for input-topic
consumer")
+ protected String subsName;
@Parameter(names = "--customSerdeInputs", description = "The map of
input topics to SerDe class names (as a JSON string)")
protected String customSerdeInputString;
@Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
@@ -282,6 +274,10 @@ public class CmdSinks extends CmdBase {
sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
}
+ if (isNotBlank(subsName)) {
+ sinkConfig.setSourceSubscriptionName(subsName);
+ }
+
if (null != topicsPattern) {
sinkConfig.setTopicsPattern(topicsPattern);
}
@@ -434,7 +430,7 @@ public class CmdSinks extends CmdBase {
if (!isBuiltin) {
if (sinkConfig.getArchive().startsWith(Utils.FILE)) {
- if (StringUtils.isBlank(sinkConfig.getClassName())) {
+ if (isBlank(sinkConfig.getClassName())) {
throw new ParameterException("Class-name must be
present for archive with file-url");
}
sinkClassName = sinkConfig.getClassName(); // server
derives the arg-type by loading a class
@@ -478,6 +474,9 @@ public class CmdSinks extends CmdBase {
if (typeArg != null) {
sourceSpecBuilder.setTypeClassName(typeArg);
}
+ if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
+
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
+ }
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
@@ -572,7 +571,7 @@ public class CmdSinks extends CmdBase {
public class ListSinks extends BaseCommand {
@Override
void runCmd() throws Exception {
- admin.functions().getConnectorsList().stream().filter(x ->
!StringUtils.isEmpty(x.getSinkClass()))
+ admin.functions().getConnectorsList().stream().filter(x ->
isNotBlank(x.getSinkClass()))
.forEach(connector -> {
System.out.println(connector.getName());
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b479852..b64224f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -43,6 +43,7 @@ import
org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
@@ -506,7 +507,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
pulsarSourceConfig.setTopicsPattern(sourceSpec.getTopicsPattern());
pulsarSourceConfig.setSubscriptionName(
-
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+ StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ?
sourceSpec.getSubscriptionName()
+ :
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
pulsarSourceConfig.setProcessingGuarantees(
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index 01b3660..92f14f6 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -75,6 +75,7 @@ message SourceSpec {
/* If specified, this will refer to an archive that is
* already present in the server */
string builtin = 8;
+ string subscriptionName = 9;
}
message SinkSpec {
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index d97aab2..3414973 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -49,6 +49,7 @@ public class SinkConfig {
@NotNull
private String name;
private String className;
+ private String sourceSubscriptionName;
@isMapEntryCustom(keyValidatorClasses = {
ValidatorImpls.TopicNameValidator.class },
valueValidatorClasses = { ValidatorImpls.SerdeValidator.class })