rdhabalia closed pull request #2259: Fix: set subscription-type based on
message ordering
URL: https://github.com/apache/incubator-pulsar/pull/2259
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 3092fd7ede..966ec6072f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -86,6 +86,7 @@
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import
org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
@@ -240,6 +241,8 @@ void processArguments() throws Exception {
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--userConfig", description = "User-defined config
key/values")
protected String userConfigString;
+ @Parameter(names = "--retainOrdering", description = "Function
consumes and processes messages in order")
+ protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The function's
parallelism factor (i.e. the number of function instances to run)")
protected Integer parallelism;
@Parameter(names = "--cpu", description = "The cpu in cores that need
to be allocated per function instance(applicable only to docker runtime)")
@@ -315,6 +318,9 @@ void processArguments() throws Exception {
if (null != processingGuarantees) {
functionConfig.setProcessingGuarantees(processingGuarantees);
}
+
+ functionConfig.setRetainOrdering(retainOrdering);
+
if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> userConfigMap = new
Gson().fromJson(userConfigString, type);
@@ -560,24 +566,13 @@ protected FunctionDetails convert(FunctionConfig
functionConfig)
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
}
- // Set subscription type based on processing semantics
- if (functionConfig.getProcessingGuarantees() != null) {
- switch (functionConfig.getProcessingGuarantees()) {
- case ATMOST_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
- break;
- case ATLEAST_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
- break;
- case EFFECTIVELY_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER);
- break;
- default:
- throw new RuntimeException("Unknown processing
guarantee: "
- +
functionConfig.getProcessingGuarantees().name());
- }
- }
-
+ // Set subscription type based on ordering and EFFECTIVELY_ONCE
semantics
+ SubscriptionType subType = (functionConfig.isRetainOrdering()
+ ||
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
+ ? SubscriptionType.FAILOVER
+ : SubscriptionType.SHARED;
+ sourceSpecBuilder.setSubscriptionType(subType);
+
if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 20438770b2..8417874f38 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -57,6 +57,7 @@
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -468,7 +469,6 @@ protected FunctionDetails createSinkConfig(SinkConfig
sinkConfig) throws IOExcep
// set source spec
// source spec classname should be empty so that the default
pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
if (sinkConfig.getTopicToSerdeClassName() != null) {
sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
}
@@ -483,9 +483,12 @@ protected FunctionDetails createSinkConfig(SinkConfig
sinkConfig) throws IOExcep
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}
- sourceSpecBuilder.setSubscriptionType(
- sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER
: SubscriptionType.SHARED);
-
+ SubscriptionType subType = (sinkConfig.isRetainOrdering()
+ ||
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
+ ? SubscriptionType.FAILOVER
+ : SubscriptionType.SHARED;
+ sourceSpecBuilder.setSubscriptionType(subType);
+
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 42c829c673..c30d419608 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -86,6 +86,7 @@
@isValidTopicName
private String logTopic;
private ProcessingGuarantees processingGuarantees;
+ private boolean retainOrdering;
private Map<String, Object> userConfig;
private Runtime runtime;
private boolean autoAck;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services