This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b923af1 [Functions] Support KEY_BASED batch builder for Java based
functions and sources (#11706)
b923af1 is described below
commit b923af16f629bf298ee2e8fec44864a2c8a2615b
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Aug 25 20:33:50 2021 +0300
[Functions] Support KEY_BASED batch builder for Java based functions and
sources (#11706)
* [Functions] Support KEY_BASED batch builder for Java based functions and
sources
* Include batchBuilder in ProducerSpec ->
ProducerConfig.ProducerConfigBuilder conversion
* Support setting batch builder for sources with "--batch-builder
KEY_BASED" argument
---
.../pulsar/functions/instance/ContextImpl.java | 21 ++++++++++-----
.../functions/instance/JavaInstanceRunnable.java | 1 +
.../pulsar/functions/utils/SourceConfigUtils.java | 7 +++++
.../functions/utils/SourceConfigUtilsTest.java | 31 ++++++++++++++++++++++
4 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 957cc44..a57d53b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
@@ -146,14 +147,22 @@ class ContextImpl implements Context, SinkContext,
SourceContext, AutoCloseable
this.producerBuilder = (ProducerBuilderImpl<?>)
client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
boolean useThreadLocalProducers = false;
- if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
- if
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()
!= 0) {
-
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+ Function.ProducerSpec producerSpec =
config.getFunctionDetails().getSink().getProducerSpec();
+ if (producerSpec != null) {
+ if (producerSpec.getMaxPendingMessages() != 0) {
+
this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
}
- if
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
!= 0) {
-
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+
this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
}
- useThreadLocalProducers =
config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+ if (producerSpec.getBatchBuilder() != null) {
+ if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+
this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+ } else {
+
this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+ }
+ }
+ useThreadLocalProducers =
producerSpec.getUseThreadLocalProducers();
}
if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 705b920..614b7a4 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -782,6 +782,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
ProducerConfig.ProducerConfigBuilder builder =
ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+ .batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9049eb6..6450d6e 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -167,6 +167,13 @@ public class SourceConfigUtils {
sinkSpecBuilder.setProducerSpec(pbldr.build());
}
+ if (sourceConfig.getBatchBuilder() != null) {
+ Function.ProducerSpec.Builder builder =
sinkSpecBuilder.getProducerSpec() != null
+ ? sinkSpecBuilder.getProducerSpec().toBuilder()
+ : Function.ProducerSpec.newBuilder();
+
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+ }
+
sinkSpecBuilder.setForwardSourceMessageProperty(true);
functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 20a64f8..22b5afa 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends
PowerMockTestCase {
assertTrue(e.getMessage().contains("Could not validate source config:
Field 'configParameter' cannot be null!"));
}
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setProducerConfig(null);
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+ }
+
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigExists() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(),
123456);
+ }
+
+ @Test
+ public void
testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined()
{
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder(null);
+ sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new
SourceConfigUtils.ExtractedSourceDetails(null, null));
+
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(),
"KEY_BASED");
+ }
+
private SourceConfig createSourceConfigWithBatch() {
SourceConfig sourceConfig = createSourceConfig();
BatchSourceConfig batchSourceConfig = createBatchSourceConfig();