This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c85dc46 Fix: set subscription-type based on message ordering (#2259)
c85dc46 is described below
commit c85dc46638cbc7822872511c776777897ce1895c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 2 12:36:04 2018 -0700
Fix: set subscription-type based on message ordering (#2259)
* Fix: set subscription-type based on message ordering
* set failover sub on EFFECTIVELY_ONCE processing guarantee
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 31 +++++++++-------------
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 11 +++++---
.../pulsar/functions/utils/FunctionConfig.java | 1 +
3 files changed, 21 insertions(+), 22 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 6948cff..9f7405c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.WindowConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import
org.apache.pulsar.functions.utils.validation.ValidatorImpls.ImplementsClassesValidator;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
@@ -240,6 +241,8 @@ public class CmdFunctions extends CmdBase {
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
@Parameter(names = "--userConfig", description = "User-defined config
key/values")
protected String userConfigString;
+ @Parameter(names = "--retainOrdering", description = "Function
consumes and processes messages in order")
+ protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The function's
parallelism factor (i.e. the number of function instances to run)")
protected Integer parallelism;
@Parameter(names = "--cpu", description = "The cpu in cores that need
to be allocated per function instance(applicable only to docker runtime)")
@@ -315,6 +318,9 @@ public class CmdFunctions extends CmdBase {
if (null != processingGuarantees) {
functionConfig.setProcessingGuarantees(processingGuarantees);
}
+
+ functionConfig.setRetainOrdering(retainOrdering);
+
if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, Object> userConfigMap = new
Gson().fromJson(userConfigString, type);
@@ -567,24 +573,13 @@ public class CmdFunctions extends CmdBase {
sourceSpecBuilder.setTopicsPattern(functionConfig.getTopicsPattern());
}
- // Set subscription type based on processing semantics
- if (functionConfig.getProcessingGuarantees() != null) {
- switch (functionConfig.getProcessingGuarantees()) {
- case ATMOST_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
- break;
- case ATLEAST_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.SHARED);
- break;
- case EFFECTIVELY_ONCE:
-
sourceSpecBuilder.setSubscriptionType(SubscriptionType.FAILOVER);
- break;
- default:
- throw new RuntimeException("Unknown processing
guarantee: "
- +
functionConfig.getProcessingGuarantees().name());
- }
- }
-
+ // Set subscription type based on ordering and EFFECTIVELY_ONCE
semantics
+ SubscriptionType subType = (functionConfig.isRetainOrdering()
+ ||
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
+ ? SubscriptionType.FAILOVER
+ : SubscriptionType.SHARED;
+ sourceSpecBuilder.setSubscriptionType(subType);
+
if (typeArgs != null) {
sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 2043877..8417874 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -468,7 +469,6 @@ public class CmdSinks extends CmdBase {
// set source spec
// source spec classname should be empty so that the default
pulsar source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
if (sinkConfig.getTopicToSerdeClassName() != null) {
sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
}
@@ -483,9 +483,12 @@ public class CmdSinks extends CmdBase {
sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
}
- sourceSpecBuilder.setSubscriptionType(
- sinkConfig.isRetainOrdering() ? SubscriptionType.FAILOVER
: SubscriptionType.SHARED);
-
+ SubscriptionType subType = (sinkConfig.isRetainOrdering()
+ ||
ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees()))
+ ? SubscriptionType.FAILOVER
+ : SubscriptionType.SHARED;
+ sourceSpecBuilder.setSubscriptionType(subType);
+
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index bcbdd72..8d91b8e 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -85,6 +85,7 @@ public class FunctionConfig {
@isValidTopicName
private String logTopic;
private ProcessingGuarantees processingGuarantees;
+ private boolean retainOrdering;
private Map<String, Object> userConfig;
private Runtime runtime;
private boolean autoAck;