This is an automated email from the ASF dual-hosted git repository. nlu90 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fad925f7c52 [improve][fn] Implement pip 401: Support set batching configurations for Pulsar Functions&Sources (#23860) fad925f7c52 is described below commit fad925f7c522a30ba9fa005e5788162dd17b21ab Author: jiangpengcheng <scjiangpengch...@gmail.com> AuthorDate: Wed Mar 12 13:56:42 2025 +0800 [improve][fn] Implement pip 401: Support set batching configurations for Pulsar Functions&Sources (#23860) --- .../{ProducerConfig.java => BatchingConfig.java} | 18 +- .../pulsar/common/functions/ProducerConfig.java | 1 + .../functions/instance/JavaInstanceRunnable.java | 2 + .../functions/instance/ProducerBuilderFactory.java | 29 +++ .../instance/ProducerBuilderFactoryTest.java | 60 +++++ .../proto/src/main/proto/Function.proto | 10 + .../pulsar/functions/utils/BatchingUtils.java | 86 +++++++ .../functions/utils/FunctionConfigUtils.java | 6 + .../pulsar/functions/utils/BatchingUtilsTest.java | 70 ++++++ .../integration/functions/PulsarFunctionsTest.java | 64 +++++- .../functions/java/PulsarFunctionsJavaTest.java | 31 +++ .../python/PulsarFunctionsPythonTest.java | 4 +- .../functions/utils/CommandGenerator.java | 5 + .../io/sources/DataGeneratorSourceTest.java | 248 +++++++++++++++++++++ .../integration/topologies/PulsarCluster.java | 15 ++ .../topologies/PulsarStandaloneTestBase.java | 12 +- .../src/test/resources/pulsar-io-sources.xml | 1 + 17 files changed, 641 insertions(+), 21 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java similarity index 74% copy from pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java copy to pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java index 25ca2ad79c8..1133e8e7e94 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java @@ -23,21 +23,19 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import org.apache.pulsar.client.api.CompressionType; -/** - * Configuration of the producer inside the function. - */ @Data @Builder @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode -public class ProducerConfig { - private Integer maxPendingMessages; - private Integer maxPendingMessagesAcrossPartitions; - private Boolean useThreadLocalProducers; - private CryptoConfig cryptoConfig; +public class BatchingConfig { + @Builder.Default + private boolean enabled = true; + @Builder.Default + private Integer batchingMaxPublishDelayMs = 10; + private Integer roundRobinRouterBatchingPartitionSwitchFrequency; + private Integer batchingMaxMessages; + private Integer batchingMaxBytes; private String batchBuilder; - private CompressionType compressionType; } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java index 25ca2ad79c8..483707da2dc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java @@ -40,4 +40,5 @@ public class ProducerConfig { private CryptoConfig cryptoConfig; private String batchBuilder; private CompressionType compressionType; + private BatchingConfig batchingConfig; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index cfb7e9536a3..61254a1748b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -101,6 +101,7 @@ import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.source.SingleConsumerPulsarSource; import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig; import org.apache.pulsar.functions.source.batch.BatchSourceExecutor; +import org.apache.pulsar.functions.utils.BatchingUtils; import org.apache.pulsar.functions.utils.CryptoUtils; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.io.ConnectorUtils; @@ -1050,6 +1051,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { .batchBuilder(conf.getBatchBuilder()) .useThreadLocalProducers(conf.getUseThreadLocalProducers()) .cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec())) + .batchingConfig(BatchingUtils.convertFromSpec(conf.getBatchingSpec())) .compressionType(FunctionCommon.convertFromFunctionDetailsCompressionType( conf.getCompressionType())); pulsarSinkConfig.setProducerConfig(builder.build()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java index b08f7f3f2cb..cec0ee91e60 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java @@ -116,6 +116,35 @@ public class ProducerBuilderFactory { builder.batcherBuilder(BatcherBuilder.DEFAULT); } } + if (producerConfig.getBatchingConfig() != null) { + builder.enableBatching(producerConfig.getBatchingConfig().isEnabled()); + if (producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() != null + && producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() > 0) { + builder.batchingMaxPublishDelay(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs(), + TimeUnit.MILLISECONDS); + } + if (producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency() != null + && producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency() + > 0) { + builder.roundRobinRouterBatchingPartitionSwitchFrequency( + producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()); + } + if (producerConfig.getBatchingConfig().getBatchingMaxMessages() != null + && producerConfig.getBatchingConfig().getBatchingMaxMessages() > 0) { + builder.batchingMaxMessages(producerConfig.getBatchingConfig().getBatchingMaxMessages()); + } + if (producerConfig.getBatchingConfig().getBatchingMaxBytes() != null + && producerConfig.getBatchingConfig().getBatchingMaxBytes() > 0) { + builder.batchingMaxBytes(producerConfig.getBatchingConfig().getBatchingMaxBytes()); + } + if (producerConfig.getBatchingConfig().getBatchBuilder() != null) { + if (producerConfig.getBatchingConfig().getBatchBuilder().equals("KEY_BASED")) { + builder.batcherBuilder(BatcherBuilder.KEY_BASED); + } else { + builder.batcherBuilder(BatcherBuilder.DEFAULT); + } + } + } } return builder; } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java index 42940f7e2da..a9bb0b21185 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.testng.Assert.assertEquals; @@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.functions.BatchingConfig; import org.apache.pulsar.common.functions.CryptoConfig; import org.apache.pulsar.common.functions.ProducerConfig; import org.mockito.internal.util.MockUtil; @@ -139,6 +141,62 @@ public class ProducerBuilderFactoryTest { cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value")); cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName()); producerConfig.setCryptoConfig(cryptoConfig); + BatchingConfig batchingConfig = new BatchingConfig(); + batchingConfig.setEnabled(true); + batchingConfig.setBatchingMaxPublishDelayMs(20); + batchingConfig.setBatchingMaxMessages(100); + batchingConfig.setBatchingMaxBytes(-1); + producerConfig.setBatchingConfig(batchingConfig); + ProducerBuilderFactory builderFactory = new ProducerBuilderFactory(pulsarClient, producerConfig, null, null); + builderFactory.createProducerBuilder("topic", Schema.STRING, "producerName"); + + verify(pulsarClient).newProducer(Schema.STRING); + verify(producerBuilder).blockIfQueueFull(true); + // enableBatching will be called twice here: + // the first time is called by default to keep the backward compability + // the second call is called when the producerConfig and producerConfig.batchingConfig are not null + verify(producerBuilder, times(2)).enableBatching(true); + verify(producerBuilder).batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + verify(producerBuilder).hashingScheme(HashingScheme.Murmur3_32Hash); + verify(producerBuilder).messageRoutingMode(MessageRoutingMode.CustomPartition); + verify(producerBuilder).messageRouter(FunctionResultRouter.of()); + verify(producerBuilder).sendTimeout(0, TimeUnit.SECONDS); + verify(producerBuilder).topic("topic"); + verify(producerBuilder).producerName("producerName"); + + verify(producerBuilder).compressionType(CompressionType.SNAPPY); + verify(producerBuilder).batcherBuilder(BatcherBuilder.KEY_BASED); + verify(producerBuilder).maxPendingMessages(5000); + verify(producerBuilder).maxPendingMessagesAcrossPartitions(50000); + TestCryptoKeyReader lastInstance = TestCryptoKeyReader.LAST_INSTANCE; + assertNotNull(lastInstance); + assertEquals(lastInstance.configs, cryptoConfig.getCryptoKeyReaderConfig()); + verify(producerBuilder).cryptoKeyReader(lastInstance); + verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL); + verify(producerBuilder).addEncryptionKey("key1"); + verify(producerBuilder).addEncryptionKey("key2"); + verify(producerBuilder).batchingMaxPublishDelay(20, TimeUnit.MILLISECONDS); + verify(producerBuilder).batchingMaxMessages(100); + verifyNoMoreInteractions(producerBuilder); + } + + @Test + public void testCreateProducerBuilderWithBatchingDisabled() { + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchBuilder("KEY_BASED"); + producerConfig.setCompressionType(CompressionType.SNAPPY); + producerConfig.setMaxPendingMessages(5000); + producerConfig.setMaxPendingMessagesAcrossPartitions(50000); + CryptoConfig cryptoConfig = new CryptoConfig(); + cryptoConfig.setProducerCryptoFailureAction(ProducerCryptoFailureAction.FAIL); + cryptoConfig.setEncryptionKeys(new String[]{"key1", "key2"}); + cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value")); + cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName()); + producerConfig.setCryptoConfig(cryptoConfig); + BatchingConfig batchingConfig = new BatchingConfig(); + batchingConfig.setEnabled(false); + batchingConfig.setBatchingMaxPublishDelayMs(0); + producerConfig.setBatchingConfig(batchingConfig); ProducerBuilderFactory builderFactory = new ProducerBuilderFactory(pulsarClient, producerConfig, null, null); builderFactory.createProducerBuilder("topic", Schema.STRING, "producerName"); verifyCommon(); @@ -153,12 +211,14 @@ public class ProducerBuilderFactoryTest { verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL); verify(producerBuilder).addEncryptionKey("key1"); verify(producerBuilder).addEncryptionKey("key2"); + verify(producerBuilder).enableBatching(false); verifyNoMoreInteractions(producerBuilder); } public static class TestCryptoKeyReader implements CryptoKeyReader { static TestCryptoKeyReader LAST_INSTANCE; Map<String, Object> configs; + public TestCryptoKeyReader(Map<String, Object> configs) { this.configs = configs; assert LAST_INSTANCE == null; diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index de3f03a3900..5bd1a42b5a3 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -121,6 +121,7 @@ message ProducerSpec { CryptoSpec cryptoSpec = 4; string batchBuilder = 5; CompressionType compressionType = 6; + BatchingSpec batchingSpec = 7; } message CryptoSpec { @@ -147,6 +148,15 @@ message CryptoSpec { FailureAction consumerCryptoFailureAction = 5; } +message BatchingSpec { + bool enabled = 1; + int32 batchingMaxPublishDelayMs = 2; + int32 roundRobinRouterBatchingPartitionSwitchFrequency = 3; + int32 batchingMaxMessages = 4; + int32 batchingMaxBytes = 5; + string batchBuilder = 6; +} + message SourceSpec { string className = 1; // map in json format diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java new file mode 100644 index 00000000000..7db61f7ace8 --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils; + +import org.apache.pulsar.common.functions.BatchingConfig; +import org.apache.pulsar.functions.proto.Function; + +public final class BatchingUtils { + public static Function.BatchingSpec convert(BatchingConfig config) { + if (config == null) { + return null; + } + + Function.BatchingSpec.Builder builder = Function.BatchingSpec.newBuilder() + .setEnabled(config.isEnabled()); + + if (config.getBatchingMaxPublishDelayMs() != null && config.getBatchingMaxPublishDelayMs() > 0) { + builder.setBatchingMaxPublishDelayMs(config.getBatchingMaxPublishDelayMs()); + } + if (config.getRoundRobinRouterBatchingPartitionSwitchFrequency() != null + && config.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) { + builder.setRoundRobinRouterBatchingPartitionSwitchFrequency( + config.getRoundRobinRouterBatchingPartitionSwitchFrequency()); + } + if (config.getBatchingMaxMessages() != null && config.getBatchingMaxMessages() > 0) { + builder.setBatchingMaxMessages(config.getBatchingMaxMessages()); + } + if (config.getBatchingMaxBytes() != null && config.getBatchingMaxBytes() > 0) { + builder.setBatchingMaxBytes(config.getBatchingMaxBytes()); + } + if (config.getBatchBuilder() != null && !config.getBatchBuilder().isEmpty()) { + builder.setBatchBuilder(config.getBatchBuilder()); + } + + return builder.build(); + } + + public static BatchingConfig convertFromSpec(Function.BatchingSpec spec) { + // to keep the backward compatibility, when batchingSpec is null or empty + // the batching is enabled by default, and the default max publish delay is 10ms + if (spec == null || spec.toString().equals("")) { + return BatchingConfig.builder() + .enabled(true) + .batchingMaxPublishDelayMs(10) + .build(); + } + + BatchingConfig.BatchingConfigBuilder builder = BatchingConfig.builder() + .enabled(spec.getEnabled()); + + if (spec.getBatchingMaxPublishDelayMs() > 0) { + builder.batchingMaxPublishDelayMs(spec.getBatchingMaxPublishDelayMs()); + } + if (spec.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) { + builder.roundRobinRouterBatchingPartitionSwitchFrequency( + spec.getRoundRobinRouterBatchingPartitionSwitchFrequency()); + } + if (spec.getBatchingMaxMessages() > 0) { + builder.batchingMaxMessages(spec.getBatchingMaxMessages()); + } + if (spec.getBatchingMaxBytes() > 0) { + builder.batchingMaxBytes(spec.getBatchingMaxBytes()); + } + if (spec.getBatchBuilder() != null && !spec.getBatchBuilder().isEmpty()) { + builder.batchBuilder(spec.getBatchBuilder()); + } + + return builder.build(); + } +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 45fb4c1cb1e..d19a77e9dea 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -524,6 +524,9 @@ public class FunctionConfigUtils { if (producerConf.getBatchBuilder() != null) { builder.setBatchBuilder(producerConf.getBatchBuilder()); } + if (producerConf.getBatchingConfig() != null) { + builder.setBatchingSpec(BatchingUtils.convert(producerConf.getBatchingConfig())); + } if (producerConf.getCompressionType() != null) { builder.setCompressionType(convertFromCompressionType(producerConf.getCompressionType())); } else { @@ -546,6 +549,9 @@ public class FunctionConfigUtils { if (spec.getBatchBuilder() != null) { producerConfig.setBatchBuilder(spec.getBatchBuilder()); } + if (spec.hasBatchingSpec()) { + producerConfig.setBatchingConfig(BatchingUtils.convertFromSpec(spec.getBatchingSpec())); + } producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers()); producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType())); return producerConfig; diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java new file mode 100644 index 00000000000..655c0d3837e --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils; + +import static org.testng.Assert.*; +import org.apache.pulsar.common.functions.BatchingConfig; +import org.apache.pulsar.functions.proto.Function; +import org.testng.annotations.Test; + +public class BatchingUtilsTest { + + @Test + public void testConvert() { + BatchingConfig config = BatchingConfig.builder() + .enabled(true) + .batchingMaxPublishDelayMs(30) + .roundRobinRouterBatchingPartitionSwitchFrequency(10) + .batchingMaxMessages(1000) + .batchBuilder("DEFAULT") + .build(); + Function.BatchingSpec spec = BatchingUtils.convert(config); + assertEquals(spec.getEnabled(), true); + assertEquals(spec.getBatchingMaxPublishDelayMs(), 30); + assertEquals(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency(), 10); + assertEquals(spec.getBatchingMaxMessages(), 1000); + assertEquals(spec.getBatchingMaxBytes(), 0); + assertEquals(spec.getBatchBuilder(), "DEFAULT"); + } + + @Test + public void testConvertFromSpec() { + Function.BatchingSpec spec = Function.BatchingSpec.newBuilder() + .setEnabled(true) + .setBatchingMaxPublishDelayMs(30) + .setRoundRobinRouterBatchingPartitionSwitchFrequency(10) + .setBatchingMaxMessages(1000) + .setBatchBuilder("DEFAULT") + .build(); + BatchingConfig config = BatchingUtils.convertFromSpec(spec); + assertEquals(config.isEnabled(), true); + assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 30); + assertEquals(config.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue(), 10); + assertEquals(config.getBatchingMaxMessages().intValue(), 1000); + assertEquals(config.getBatchingMaxBytes(), null); + assertEquals(config.getBatchBuilder(), "DEFAULT"); + } + + @Test + public void testConvertFromSpecFromNull() { + BatchingConfig config = BatchingUtils.convertFromSpec(null); + assertTrue(config.isEnabled()); + assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 10); + } +} \ No newline at end of file diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 694dcba5eaf..ae94ddd1be5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -62,8 +62,10 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; +import org.apache.pulsar.common.functions.BatchingConfig; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.policies.data.FunctionStatsImpl; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.policies.data.FunctionStatusUtil; @@ -695,7 +697,18 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { boolean pyZip, boolean multipleInput, boolean withExtraDeps, - java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception { + ProducerConfig producerConfig) throws Exception { + testExclamationFunction(runtime, isTopicPattern, pyZip, multipleInput, withExtraDeps, producerConfig, null); + } + + protected void testExclamationFunction(Runtime runtime, + boolean isTopicPattern, + boolean pyZip, + boolean multipleInput, + boolean withExtraDeps, + ProducerConfig producerConfig, + java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) + throws Exception { if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime == Runtime.PYTHON || runtime == Runtime.GO)) { // python&go can only run on process mode return; @@ -725,10 +738,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // submit the exclamation function submitExclamationFunction( - runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, schema, commandGeneratorConsumer); + runtime, inputTopicName, outputTopicName, functionName, pyZip, withExtraDeps, schema, + commandGeneratorConsumer); // get function info final String info = getFunctionInfoSuccess(functionName); + FunctionConfig config = ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, FunctionConfig.class); + + // check batching config + if (runtime == Runtime.JAVA) { + BatchingConfig batchingConfig = null; + if (producerConfig != null && producerConfig.getBatchingConfig() != null) { + batchingConfig = producerConfig.getBatchingConfig(); + } + checkBatchingConfig(functionName, batchingConfig, config); + } // get function stats getFunctionStatsEmpty(functionName); @@ -770,8 +794,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { break; } - checkSubscriptionType(inputTopicName, - ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, FunctionConfig.class)); + // check subscription type + checkSubscriptionType(inputTopicName, config); // delete function deleteFunction(functionName); @@ -819,6 +843,26 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { }); } + // checking batching config, we can only check this by checking the logs for now + private void checkBatchingConfig(String functionName, BatchingConfig config, FunctionConfig functionConfig) { + if (config != null) { + assertNotNull(functionConfig.getProducerConfig()); + assertNotNull(functionConfig.getProducerConfig().getBatchingConfig()); + assertEquals(config.toString(), functionConfig.getProducerConfig().getBatchingConfig().toString()); + } + + String functionLogs = pulsarCluster.getFunctionLogs(functionName); + if (config == null || config.isEnabled()) { + BatchingConfig finalConfig = config; + if (finalConfig == null) { + finalConfig = BatchingConfig.builder().build(); + } + assertTrue(functionLogs.contains(finalConfig.toString())); + } else { + assertTrue(functionLogs.contains("BatchingConfig(enabled=false")); + } + } + private void submitExclamationFunction(Runtime runtime, String inputTopicName, String outputTopicName, @@ -837,7 +881,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { boolean pyZip, boolean withExtraDeps, Schema<?> schema, - java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception { + java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) + throws Exception { submitFunction( runtime, inputTopicName, @@ -860,7 +905,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { boolean isPublishFunction, String functionClass, Schema<T> inputTopicSchema, - java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception { + java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) + throws Exception { String file = null; if (Runtime.JAVA == runtime) { @@ -894,7 +940,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String functionFile, String functionClass, Schema<T> inputTopicSchema, - java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception { + java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) + throws Exception { submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null, null, null, null, null, null, commandGeneratorConsumer); @@ -913,7 +960,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { SubscriptionInitialPosition subscriptionInitialPosition, String inputTypeClassName, String outputTypeClassName, - java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws Exception { + java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) + throws Exception { if (StringUtils.isNotEmpty(inputTopicName)) { ensureSubscriptionCreated( diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java index 939d6e19d1f..fc9ffe9da0d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.Map; import org.apache.commons.collections4.map.HashedMap; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.functions.BatchingConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.policies.data.FunctionStatus; import org.apache.pulsar.common.policies.data.FunctionStatusUtil; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; @@ -109,6 +111,35 @@ public abstract class PulsarFunctionsJavaTest extends PulsarFunctionsTest { testExclamationFunction(Runtime.JAVA, true, false, false, false); } + @Test(groups = {"java_function", "function"}) + public void testJavaExclamationCustomBatchingFunction() throws Exception { + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingConfig(BatchingConfig.builder() + .enabled(true) + .batchingMaxPublishDelayMs(5) + .batchingMaxMessages(100) + .batchingMaxBytes(64 * 1024) + .roundRobinRouterBatchingPartitionSwitchFrequency(5) + .batchBuilder("KEY_BASED") + .build()); + testExclamationFunction(Runtime.JAVA, false, false, false, false, + producerConfig, commandGenerator -> { + commandGenerator.setProducerConfig(producerConfig); + }); + } + + @Test(groups = {"java_function", "function"}) + public void testJavaExclamationDiableBatchingFunction() throws Exception { + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setBatchingConfig(BatchingConfig.builder() + .enabled(false) + .build()); + testExclamationFunction(Runtime.JAVA, false, false, false, false, + producerConfig, commandGenerator -> { + commandGenerator.setProducerConfig(producerConfig); + }); + } + @Test(groups = {"java_function", "function"}) public void testJavaLoggingFunction() throws Exception { testLoggingFunction(Runtime.JAVA); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java index 9ba210b9988..05441f0111b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java @@ -72,7 +72,7 @@ public abstract class PulsarFunctionsPythonTest extends PulsarFunctionsTest { @Test(groups = {"python_function", "function"}) public void testRetainOrderingTest() throws Exception { testExclamationFunction(Runtime.PYTHON, false, false, false, - false, generator -> { + false, null, generator -> { generator.setRetainOrdering(true); }); } @@ -80,7 +80,7 @@ public abstract class PulsarFunctionsPythonTest extends PulsarFunctionsTest { @Test(groups = {"python_function", "function"}) public void testRetainKeyOrderingTest() throws Exception { testExclamationFunction(Runtime.PYTHON, false, false, false, - false, generator -> { + false, null, generator -> { System.out.println("calling generator.setRetainKeyOrdering(true);"); generator.setRetainKeyOrdering(true); }); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index e0fbd604007..08765b5e535 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -26,6 +26,7 @@ import lombok.Setter; import lombok.ToString; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; @Getter @@ -66,6 +67,7 @@ public class CommandGenerator { private SubscriptionInitialPosition subscriptionInitialPosition; private Boolean retainOrdering; private Boolean retainKeyOrdering; + private ProducerConfig producerConfig; private Map<String, String> userConfig = new HashMap<>(); public static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar"; @@ -256,6 +258,9 @@ public class CommandGenerator { } break; } + if (producerConfig != null) { + commandBuilder.append(" --producer-config \'" + new Gson().toJson(producerConfig) + "\'"); + } return commandBuilder.toString(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java new file mode 100644 index 00000000000..79981fce2b4 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io.sources; + +import com.google.gson.Gson; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.functions.BatchingConfig; +import org.apache.pulsar.common.functions.ProducerConfig; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.policies.data.SourceStatus; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.tests.integration.containers.StandaloneContainer; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.Assert; +import org.testng.annotations.Test; + + +import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/** + * This tests verifies that a batch source can be successfully submitted and run via the pulsar-admin CLI + */ +@Slf4j +public class DataGeneratorSourceTest extends PulsarStandaloneTestSuite { + + @Test(groups = {"source"}) + public void testSource() throws Exception { + testGenericRecordSource(null); + } + + @Test(groups = {"source"}) + public void testSourceCustomBatching() throws Exception { + BatchingConfig config = BatchingConfig.builder() + .enabled(true) + .batchingMaxPublishDelayMs(5) + .roundRobinRouterBatchingPartitionSwitchFrequency(10) + .batchingMaxMessages(10) + .batchingMaxBytes(32 * 1024) + .batchBuilder("KEY_BASED") + .build(); + testGenericRecordSource(config); + } + + @Test(groups = {"source"}) + public void testSourceDisableBatching() throws Exception { + BatchingConfig config = BatchingConfig.builder() + .enabled(false) + .build(); + testGenericRecordSource(config); + } + + public void testGenericRecordSource(BatchingConfig config) throws Exception { + String outputTopicName = "test-state-source-output-" + randomName(8); + String sourceName = "test-state-source-" + randomName(8); + int numMessages = 10; + try { + ProducerConfig producerConfig = null; + if (config != null) { + producerConfig = ProducerConfig.builder() + .batchingConfig(config) + .build(); + } + submitSourceConnector( + sourceName, + outputTopicName, + "builtin://data-generator", + producerConfig); + + // get source info + String info = getSourceInfoSuccess(container, sourceName); + SourceConfig sourceConfig = + ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, SourceConfig.class); + // checking batching config is applied + checkBatchingConfig(sourceName, config, sourceConfig); + + // get source status + getSourceStatus(container, sourceName); + + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + + retryStrategically((test) -> { + try { + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + return status.getInstances().size() > 0 + && status.getInstances().get(0).getStatus().numWritten >= 10; + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10); + } + + // delete source + deleteSource(container, sourceName); + + getSourceInfoNotFound(container, sourceName); + } finally { + dumpFunctionLogs(sourceName); + } + } + + private void submitSourceConnector(String sourceName, + String outputTopicName, + String archive, + ProducerConfig producerConfig) throws Exception { + List<String> commands = new ArrayList<>(List.of( + PulsarCluster.ADMIN_SCRIPT, + "sources", "create", + "--name", sourceName, + "--destinationTopicName", outputTopicName, + "--archive", archive + )); + if (producerConfig != null) { + commands.add("--producer-config"); + commands.add(new Gson().toJson(producerConfig)); + } + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = container.execCmd(commands.toArray(new String[0])); + assertTrue( + result.getStdout().contains("Created successfully"), + result.getStdout()); + } + + private static String getSourceInfoSuccess(StandaloneContainer container, String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); + return result.getStdout(); + } + + private static void getSourceStatus(StandaloneContainer container, String sourceName) throws Exception { + retryStrategically((test) -> { + try { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + + if (result.getStdout().contains("\"running\" : true")) { + return true; + } + return false; + } catch (Exception e) { + log.error("Encountered error when getting source status", e); + return false; + } + }, 10, 200); + + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + + Assert.assertTrue(result.getStdout().contains("\"running\" : true")); + } + + // checking batching config, we can only check this by checking the logs for now + private void checkBatchingConfig(String functionName, BatchingConfig config, SourceConfig sourceConfig) { + if (config != null) { + assertNotNull(sourceConfig.getProducerConfig()); + assertNotNull(sourceConfig.getProducerConfig().getBatchingConfig()); + assertEquals(config.toString(), sourceConfig.getProducerConfig().getBatchingConfig().toString()); + } + + String functionLogs = getFunctionLogs(functionName); + if (config == null || config.isEnabled()) { + BatchingConfig finalConfig = config; + if (finalConfig == null) { + finalConfig = BatchingConfig.builder().build(); + } + assertTrue(functionLogs.contains(finalConfig.toString())); + } else { + assertTrue(functionLogs.contains("BatchingConfig(enabled=false")); + } + } + + private static void deleteSource(StandaloneContainer container, String sourceName) throws Exception { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "delete", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName + ); + assertTrue(result.getStdout().contains("Delete source successfully")); + assertTrue(result.getStderr().isEmpty()); + } + + private static void getSourceInfoNotFound(StandaloneContainer container, String sourceName) throws Exception { + try { + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "get", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + fail("Command should have exited with non-zero"); + } catch (ContainerExecException e) { + assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); + } + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 35fb453c4bb..6e3c4530f5d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -720,6 +720,21 @@ public class PulsarCluster { enabled ? "--enable" : "--disable"); } + public String getFunctionLogs(String name) { + StringBuilder logs = new StringBuilder(); + for (WorkerContainer container : getAlWorkers()) { + try { + String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log"; + logs.append(container.<String>copyFileFromContainer(logFile, (inputStream) -> { + return IOUtils.toString(inputStream, "utf-8"); + })); + } catch (Exception e) { + log.error("Failed to get function logs from container {}", container.getContainerName(), e); + } + } + return logs.toString(); + } + public void dumpFunctionLogs(String name) { for (WorkerContainer container : getAlWorkers()) { log.info("Trying to get function {} logs from container {}", name, container.getContainerName()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java index 411b5217501..23a17a1b896 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java @@ -107,7 +107,17 @@ public abstract class PulsarStandaloneTestBase extends PulsarTestBase { } } - + protected String getFunctionLogs(String name) { + try { + String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log"; + return container.<String>copyFileFromContainer(logFile, (inputStream) -> { + return IOUtils.toString(inputStream, "utf-8"); + }); + } catch (Throwable err) { + log.info("Cannot get {} logs", name, err); + return ""; + } + } protected void dumpFunctionLogs(String name) { try { diff --git a/tests/integration/src/test/resources/pulsar-io-sources.xml b/tests/integration/src/test/resources/pulsar-io-sources.xml index 636b3e47919..3d3b150f664 100644 --- a/tests/integration/src/test/resources/pulsar-io-sources.xml +++ b/tests/integration/src/test/resources/pulsar-io-sources.xml @@ -24,6 +24,7 @@ <classes> <class name="org.apache.pulsar.tests.integration.io.sources.debezium.PulsarDebeziumSourcesTest" /> <class name="org.apache.pulsar.tests.integration.io.sources.BatchSourceTest" /> + <class name="org.apache.pulsar.tests.integration.io.sources.DataGeneratorSourceTest" /> <class name="org.apache.pulsar.tests.integration.io.sources.AvroKafkaSourceTest" /> </classes> </test>