This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da9422  Support key_based batch builder for functions and sources 
(#8523)
8da9422 is described below

commit 8da9422e822888cb198f62b4c7cd6ab3d400a87e
Author: xiaolong ran <[email protected]>
AuthorDate: Tue Nov 17 02:20:06 2020 +0800

    Support key_based batch builder for functions and sources (#8523)
    
    ### Motivation
    
    Currently, we support the Key_Shared subscription mode in Pulsar Function. 
In order to ensure that when batch is turned on, we can also ensure that 
messages are distributed to different consumers in the correct order, so we 
need to support the batch builder of `KEY_BASED` in Pulsar Functions.
    
    ### Modifications
    
    - Add `--batch-builder` for Pulsar Functions
    - Add `--batch-builder` for Pulsar Sources
    - Add test case
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java   | 6 ++++++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java     | 7 +++++++
 .../java/org/apache/pulsar/common/functions/FunctionConfig.java   | 2 ++
 .../java/org/apache/pulsar/common/functions/ProducerConfig.java   | 1 +
 .../src/main/java/org/apache/pulsar/common/io/SourceConfig.java   | 2 ++
 .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java    | 8 ++++++++
 pulsar-functions/proto/src/main/proto/Function.proto              | 1 +
 .../org/apache/pulsar/functions/utils/FunctionConfigUtils.java    | 6 ++++++
 .../java/org/apache/pulsar/functions/utils/SourceConfigUtils.java | 6 ++++++
 .../apache/pulsar/functions/utils/FunctionConfigUtilsTest.java    | 3 +++
 .../org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java  | 1 +
 .../tests/integration/functions/utils/CommandGenerator.java       | 7 +++++++
 12 files changed, 50 insertions(+)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index dacc839..f9670e0 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -266,6 +266,8 @@ public class CmdFunctions extends CmdBase {
         protected Boolean DEPRECATED_retainOrdering;
         @Parameter(names = "--retain-ordering", description = "Function 
consumes and processes messages in order")
         protected Boolean retainOrdering;
+        @Parameter(names = "--batch-builder", description = "BatcherBuilder 
provides two types of batch construction methods, DEFAULT and KEY_BASED. The 
default value is: DEFAULT")
+        protected String batchBuilder;
         @Parameter(names = "--forward-source-message-property", description = 
"Forwarding input message's properties to output topic when processing")
         protected Boolean forwardSourceMessageProperty = true;
         @Parameter(names = "--subs-name", description = "Pulsar source 
subscription name if user wants a specific subscription-name for input-topic 
consumer")
@@ -419,6 +421,10 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setRetainOrdering(retainOrdering);
             }
 
+            if (isNotBlank(batchBuilder)) {
+                functionConfig.setBatchBuilder(batchBuilder);
+            }
+
             if (null != forwardSourceMessageProperty) {
                 
functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index fcc580a..521924b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -274,6 +274,9 @@ public class CmdSources extends CmdBase {
         @Parameter(names = "--producer-config", description = "The custom 
producer configuration (as a JSON string)")
         protected String producerConfig;
 
+        @Parameter(names = "--batch-builder", description = "BatchBuilder 
provides two types of batch construction methods, DEFAULT and KEY_BASED. The 
default value is: DEFAULT")
+        protected String batchBuilder;
+
         @Parameter(names = "--deserializationClassName", description = "The 
SerDe classname for the source", hidden = true)
         protected String DEPRECATED_deserializationClassName;
         @Parameter(names = "--deserialization-classname", description = "The 
SerDe classname for the source")
@@ -360,6 +363,10 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setSchemaType(schemaType);
             }
 
+            if (null != batchBuilder) {
+                sourceConfig.setBatchBuilder(batchBuilder);
+            }
+
             if (null != processingGuarantees) {
                 sourceConfig.setProcessingGuarantees(processingGuarantees);
             }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 9cd8ef6..c9f4645 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -95,6 +95,8 @@ public class FunctionConfig {
     private Boolean retainOrdering;
     // Do we want the same function instance to process all data keyed by the 
input topic's message key
     private Boolean retainKeyOrdering;
+    // batchBuilder provides two types of batch construction methods, DEFAULT 
and KEY_BASED
+    private String batchBuilder;
     private Boolean forwardSourceMessageProperty;
     private Map<String, Object> userConfig;
     // This is a map of secretName(aka how the secret is going to be
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 5b68663..0d2f4b4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -37,4 +37,5 @@ public class ProducerConfig {
     private Integer maxPendingMessagesAcrossPartitions;
     private Boolean useThreadLocalProducers;
     private CryptoConfig cryptoConfig;
+    private String batchBuilder;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 31a8634..79a5f81 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -69,4 +69,6 @@ public class SourceConfig {
 
     // If this is a BatchSource, its batch related configs are stored here
     private BatchSourceConfig batchSourceConfig;
+    // batchBuilder provides two types of batch construction methods, DEFAULT 
and KEY_BASED
+    private String batchBuilder;
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index f121950..9ba4059 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -24,6 +24,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.HashingScheme;
@@ -133,6 +134,13 @@ public class PulsarSink<T> implements Sink<T> {
                         builder.addEncryptionKey(encryptionKeyName);
                     }
                 }
+                if (producerConfig.getBatchBuilder() != null) {
+                    if (producerConfig.getBatchBuilder().equals("KEY_BASED")) {
+                        builder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                    } else {
+                        builder.batcherBuilder(BatcherBuilder.DEFAULT);
+                    }
+                }
             }
             return builder.properties(properties).create();
         }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 4243fb6..4b840a3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -108,6 +108,7 @@ message ProducerSpec {
     int32 maxPendingMessagesAcrossPartitions = 2;
     bool useThreadLocalProducers = 3;
     CryptoSpec cryptoSpec = 4;
+    string batchBuilder = 5;
 }
 
 message CryptoSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index bdf250a..b42292f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -226,6 +226,9 @@ public class FunctionConfigUtils {
             if (producerConf.getCryptoConfig() != null) {
                 
pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
             }
+            if (producerConf.getBatchBuilder() != null) {
+                pbldr.setBatchBuilder(producerConf.getBatchBuilder());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -388,6 +391,9 @@ public class FunctionConfigUtils {
             if (spec.hasCryptoSpec()) {
                 
producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
             }
+            if (spec.getBatchBuilder() != null) {
+                producerConfig.setBatchBuilder(spec.getBatchBuilder());
+            }
             
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
             functionConfig.setProducerConfig(producerConfig);
         }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index aec11fe..04f524a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -163,6 +163,9 @@ public class SourceConfigUtils {
             if (conf.getCryptoConfig() != null) {
                 
pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
             }
+            if (conf.getBatchBuilder() != null) {
+                pbldr.setBatchBuilder(conf.getBatchBuilder());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
@@ -247,6 +250,9 @@ public class SourceConfigUtils {
             if (spec.hasCryptoSpec()) {
                 
producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
             }
+            if (spec.getBatchBuilder() != null) {
+                producerConfig.setBatchBuilder(spec.getBatchBuilder());
+            }
             
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
             sourceConfig.setProducerConfig(producerConfig);
         }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index fcafd49..03c45bd 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -75,6 +75,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -115,6 +116,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("KEY_BASED");
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -449,6 +451,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setRetainOrdering(false);
         functionConfig.setRetainKeyOrdering(false);
         
functionConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
+        functionConfig.setBatchBuilder("DEFAULT");
         functionConfig.setForwardSourceMessageProperty(false);
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 520b416..bcece84 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -354,6 +354,7 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
         sourceConfig.setProducerConfig(producerConfig);
 
         sourceConfig.setConfigs(configs);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index f0b8494..102ded5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -49,6 +49,7 @@ public class CommandGenerator {
     private Runtime runtime;
     private Integer parallelism;
     private String adminUrl;
+    private String batchBuilder;
     private Integer windowLengthCount;
     private Long windowLengthDurationMs;
     private Integer slidingIntervalCount;
@@ -154,6 +155,9 @@ public class CommandGenerator {
         if (logTopic != null) {
             commandBuilder.append(" --logTopic " + logTopic);
         }
+        if (batchBuilder != null) {
+            commandBuilder.append("--batch-builder" + batchBuilder);
+        }
         if (customSereSourceTopics != null && 
!customSereSourceTopics.isEmpty()) {
             commandBuilder.append(" --customSerdeInputs \'" + new 
Gson().toJson(customSereSourceTopics) + "\'");
         }
@@ -239,6 +243,9 @@ public class CommandGenerator {
         if (customSereSourceTopics != null && 
!customSereSourceTopics.isEmpty()) {
             commandBuilder.append(" --customSerdeInputs \'" + new 
Gson().toJson(customSereSourceTopics) + "\'");
         }
+        if (batchBuilder != null) {
+            commandBuilder.append("--batch-builder" + batchBuilder);
+        }
         if (sinkTopic != null) {
             commandBuilder.append(" --output " + sinkTopic);
         }

Reply via email to