This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 2d00447  [branch-2.7] Support setting KEY_BASED batch builder for 
Pulsar Sinks (backports #11706) (#11710)
2d00447 is described below

commit 2d00447c5773fb0aee0aff3334edf37439ab9d9f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 26 18:26:16 2021 +0300

    [branch-2.7] Support setting KEY_BASED batch builder for Pulsar Sinks 
(backports #11706) (#11710)
    
    - backports #11706 to branch-2.7
    
    - include batchBuilder in ProducerSpec -> 
ProducerConfig.ProducerConfigBuilder conversion
      since it was missing
    
    - support setting batch builder with "--batch-builder KEY_BASED" argument
---
 .../functions/instance/JavaInstanceRunnable.java   |  1 +
 .../pulsar/functions/utils/SourceConfigUtils.java  |  7 +++++
 .../functions/utils/SourceConfigUtilsTest.java     | 31 ++++++++++++++++++++++
 3 files changed, 39 insertions(+)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e3aa2fa..ea79093 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -759,6 +759,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                     ProducerConfig.ProducerConfigBuilder builder = 
ProducerConfig.builder()
                             .maxPendingMessages(conf.getMaxPendingMessages())
                             
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+                            .batchBuilder(conf.getBatchBuilder())
                             
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
                             
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                     pulsarSinkConfig.setProducerConfig(builder.build());
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 904bf15..b99337c 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -150,6 +150,13 @@ public class SourceConfigUtils {
             
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
         }
 
+        if (sourceConfig.getBatchBuilder() != null) {
+            Function.ProducerSpec.Builder builder = 
sinkSpecBuilder.getProducerSpec() != null
+                    ? sinkSpecBuilder.getProducerSpec().toBuilder()
+                    : Function.ProducerSpec.newBuilder();
+            
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+        }
+
         sinkSpecBuilder.setForwardSourceMessageProperty(true);
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index bcece84..d08e8de 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -315,6 +315,37 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         assertTrue(e.getMessage().contains("Could not validate source config: 
Field 'configParameter' cannot be null!"));
     }
 
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setProducerConfig(null);
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+    }
+
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigExists() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+        
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(),
 123456);
+    }
+
+    @Test
+    public void 
testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined()
 {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder(null);
+        sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new 
SourceConfigUtils.ExtractedSourceDetails(null, null));
+        
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), 
"KEY_BASED");
+    }
+
     private SourceConfig createSourceConfigWithBatch() {
         SourceConfig sourceConfig = createSourceConfig();
         BatchSourceConfig batchSourceConfig = createBatchSourceConfig();

Reply via email to