rdhabalia closed pull request #2200: support subscription-type to be passed in
sink-function
URL: https://github.com/apache/incubator-pulsar/pull/2200
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 83a5b57837..20438770b2 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -55,6 +55,7 @@
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
@@ -212,6 +213,8 @@ void runCmd() throws Exception {
protected String customSerdeInputString;
@Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+ @Parameter(names = "--retainOrdering", description = "Sink consumes
and sinks messages in order")
+ protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The sink's
parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@Parameter(names = {"-a", "--archive"}, description = "Path to the
archive file for the sink. It also supports url-path [http/https/file (file
protocol assumes that file already exists on worker host)] from which worker
can download the package.", listConverter = StringConverter.class)
@@ -261,6 +264,8 @@ void processArguments() throws Exception {
if (null != processingGuarantees) {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}
+
+ sinkConfig.setRetainOrdering(retainOrdering);
Map<String, String> topicsToSerDeClassName = new HashMap<>();
if (null != inputs) {
@@ -477,6 +482,10 @@ protected FunctionDetails createSinkConfig(SinkConfig
sinkConfig) throws IOExcep
if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}
+
+ sourceSpecBuilder.setSubscriptionType(
+ sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER
: SubscriptionType.SHARED);
+
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 3140f9535c..42c829c673 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -55,7 +55,7 @@
ATMOST_ONCE,
EFFECTIVELY_ONCE
}
-
+
public enum Runtime {
JAVA,
PYTHON
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index 34149735ef..7e97135131 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -32,7 +32,6 @@
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isValidTopicName;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
-import java.util.HashMap;
import java.util.Map;
@Getter
@@ -60,6 +59,7 @@
@isPositiveNumber
private int parallelism = 1;
private FunctionConfig.ProcessingGuarantees processingGuarantees;
+ private boolean retainOrdering;
@isValidResources
private Resources resources;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services