This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7ba5c2d support subscription-type to be passed in sink-function
(#2200)
7ba5c2d is described below
commit 7ba5c2d4f6048762413f6cc1faab3dbf3210a08f
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Jul 19 23:48:26 2018 -0700
support subscription-type to be passed in sink-function (#2200)
* support subscription-type to be passed in sink-function
* Fix: test
* retain-ordering
---
.../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 9 +++++++++
.../java/org/apache/pulsar/functions/utils/FunctionConfig.java | 2 +-
.../main/java/org/apache/pulsar/functions/utils/SinkConfig.java | 2 +-
3 files changed, 11 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 83a5b57..2043877 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -55,6 +55,7 @@ import
org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
@@ -212,6 +213,8 @@ public class CmdSinks extends CmdBase {
protected String customSerdeInputString;
@Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+ @Parameter(names = "--retainOrdering", description = "Sink consumes
and sinks messages in order")
+ protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The sink's
parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@Parameter(names = {"-a", "--archive"}, description = "Path to the
archive file for the sink. It also supports url-path [http/https/file (file
protocol assumes that file already exists on worker host)] from which worker
can download the package.", listConverter = StringConverter.class)
@@ -261,6 +264,8 @@ public class CmdSinks extends CmdBase {
if (null != processingGuarantees) {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}
+
+ sinkConfig.setRetainOrdering(retainOrdering);
Map<String, String> topicsToSerDeClassName = new HashMap<>();
if (null != inputs) {
@@ -477,6 +482,10 @@ public class CmdSinks extends CmdBase {
if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}
+
+ sourceSpecBuilder.setSubscriptionType(
+ sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER
: SubscriptionType.SHARED);
+
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 3140f95..42c829c 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -55,7 +55,7 @@ public class FunctionConfig {
ATMOST_ONCE,
EFFECTIVELY_ONCE
}
-
+
public enum Runtime {
JAVA,
PYTHON
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 3414973..7e97135 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -32,7 +32,6 @@ import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
-import java.util.HashMap;
import java.util.Map;
@Getter
@@ -60,6 +59,7 @@ public class SinkConfig {
@isPositiveNumber
private int parallelism = 1;
private FunctionConfig.ProcessingGuarantees processingGuarantees;
+ private boolean retainOrdering;
@isValidResources
private Resources resources;