This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 2d00447 [branch-2.7] Support setting KEY_BASED batch builder for
Pulsar Sinks (backports #11706) (#11710)
2d00447 is described below
commit 2d00447c5773fb0aee0aff3334edf37439ab9d9f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 26 18:26:16 2021 +0300
[branch-2.7] Support setting KEY_BASED batch builder for Pulsar Sinks
(backports #11706) (#11710)
- backports #11706 to branch-2.7
- include batchBuilder in ProducerSpec ->
ProducerConfig.ProducerConfigBuilder conversion
since it was missing
- support setting batch builder with "--batch-builder KEY_BASED" argument
---
.../functions/instance/JavaInstanceRunnable.java | 1 +
.../pulsar/functions/utils/SourceConfigUtils.java | 7 +++++
.../functions/utils/SourceConfigUtilsTest.java | 31 ++++++++++++++++++++++
3 files changed, 39 insertions(+)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e3aa2fa..ea79093 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -759,6 +759,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
ProducerConfig.ProducerConfigBuilder builder =
ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+ .batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 904bf15..b99337c 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -150,6 +150,13 @@ public class SourceConfigUtils {
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}
+ if (sourceConfig.getBatchBuilder() != null) {
+ Function.ProducerSpec.Builder builder =
sinkSpecBuilder.getProducerSpec() != null
+ ? sinkSpecBuilder.getProducerSpec().toBuilder()
+ : Function.ProducerSpec.newBuilder();
+
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+ }
+
sinkSpecBuilder.setForwardSourceMessageProperty(true);
functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index bcece84..d08e8de 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -315,6 +315,37 @@ public class SourceConfigUtilsTest extends
PowerMockTestCase {
assertTrue(e.getMessage().contains("Could not validate source config:
Field 'configParameter' cannot be null!"));
}
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setProducerConfig(null);
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+ }
+
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigExists() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(),
123456);
+ }
+
+ @Test
+ public void
testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined()
{
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder(null);
+ sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+ }
+
private SourceConfig createSourceConfigWithBatch() {
SourceConfig sourceConfig = createSourceConfig();
BatchSourceConfig batchSourceConfig = createBatchSourceConfig();