This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fad925f7c52 [improve][fn] Implement pip 401: Support set batching 
configurations for Pulsar Functions&Sources (#23860)
fad925f7c52 is described below

commit fad925f7c522a30ba9fa005e5788162dd17b21ab
Author: jiangpengcheng <scjiangpengch...@gmail.com>
AuthorDate: Wed Mar 12 13:56:42 2025 +0800

    [improve][fn] Implement pip 401: Support set batching configurations for 
Pulsar Functions&Sources (#23860)
---
 .../{ProducerConfig.java => BatchingConfig.java}   |  18 +-
 .../pulsar/common/functions/ProducerConfig.java    |   1 +
 .../functions/instance/JavaInstanceRunnable.java   |   2 +
 .../functions/instance/ProducerBuilderFactory.java |  29 +++
 .../instance/ProducerBuilderFactoryTest.java       |  60 +++++
 .../proto/src/main/proto/Function.proto            |  10 +
 .../pulsar/functions/utils/BatchingUtils.java      |  86 +++++++
 .../functions/utils/FunctionConfigUtils.java       |   6 +
 .../pulsar/functions/utils/BatchingUtilsTest.java  |  70 ++++++
 .../integration/functions/PulsarFunctionsTest.java |  64 +++++-
 .../functions/java/PulsarFunctionsJavaTest.java    |  31 +++
 .../python/PulsarFunctionsPythonTest.java          |   4 +-
 .../functions/utils/CommandGenerator.java          |   5 +
 .../io/sources/DataGeneratorSourceTest.java        | 248 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  15 ++
 .../topologies/PulsarStandaloneTestBase.java       |  12 +-
 .../src/test/resources/pulsar-io-sources.xml       |   1 +
 17 files changed, 641 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
similarity index 74%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
index 25ca2ad79c8..1133e8e7e94 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/BatchingConfig.java
@@ -23,21 +23,19 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
-import org.apache.pulsar.client.api.CompressionType;
 
-/**
- * Configuration of the producer inside the function.
- */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
 @EqualsAndHashCode
-public class ProducerConfig {
-    private Integer maxPendingMessages;
-    private Integer maxPendingMessagesAcrossPartitions;
-    private Boolean useThreadLocalProducers;
-    private CryptoConfig cryptoConfig;
+public class BatchingConfig {
+    @Builder.Default
+    private boolean enabled = true;
+    @Builder.Default
+    private Integer batchingMaxPublishDelayMs = 10;
+    private Integer roundRobinRouterBatchingPartitionSwitchFrequency;
+    private Integer batchingMaxMessages;
+    private Integer batchingMaxBytes;
     private String batchBuilder;
-    private CompressionType compressionType;
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 25ca2ad79c8..483707da2dc 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -40,4 +40,5 @@ public class ProducerConfig {
     private CryptoConfig cryptoConfig;
     private String batchBuilder;
     private CompressionType compressionType;
+    private BatchingConfig batchingConfig;
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cfb7e9536a3..61254a1748b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -101,6 +101,7 @@ import 
org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.functions.source.SingleConsumerPulsarSource;
 import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
 import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
+import org.apache.pulsar.functions.utils.BatchingUtils;
 import org.apache.pulsar.functions.utils.CryptoUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -1050,6 +1051,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                             .batchBuilder(conf.getBatchBuilder())
                             
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
                             
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()))
+                            
.batchingConfig(BatchingUtils.convertFromSpec(conf.getBatchingSpec()))
                             
.compressionType(FunctionCommon.convertFromFunctionDetailsCompressionType(
                                     conf.getCompressionType()));
                     pulsarSinkConfig.setProducerConfig(builder.build());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
index b08f7f3f2cb..cec0ee91e60 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerBuilderFactory.java
@@ -116,6 +116,35 @@ public class ProducerBuilderFactory {
                     builder.batcherBuilder(BatcherBuilder.DEFAULT);
                 }
             }
+            if (producerConfig.getBatchingConfig() != null) {
+                
builder.enableBatching(producerConfig.getBatchingConfig().isEnabled());
+                if 
(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() != null
+                        && 
producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() > 0) {
+                    
builder.batchingMaxPublishDelay(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs(),
+                            TimeUnit.MILLISECONDS);
+                }
+                if 
(producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()
 != null
+                        && 
producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()
+                        > 0) {
+                    builder.roundRobinRouterBatchingPartitionSwitchFrequency(
+                            
producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency());
+                }
+                if 
(producerConfig.getBatchingConfig().getBatchingMaxMessages() != null
+                        && 
producerConfig.getBatchingConfig().getBatchingMaxMessages() > 0) {
+                    
builder.batchingMaxMessages(producerConfig.getBatchingConfig().getBatchingMaxMessages());
+                }
+                if (producerConfig.getBatchingConfig().getBatchingMaxBytes() 
!= null
+                        && 
producerConfig.getBatchingConfig().getBatchingMaxBytes() > 0) {
+                    
builder.batchingMaxBytes(producerConfig.getBatchingConfig().getBatchingMaxBytes());
+                }
+                if (producerConfig.getBatchingConfig().getBatchBuilder() != 
null) {
+                    if 
(producerConfig.getBatchingConfig().getBatchBuilder().equals("KEY_BASED")) {
+                        builder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                    } else {
+                        builder.batcherBuilder(BatcherBuilder.DEFAULT);
+                    }
+                }
+            }
         }
         return builder;
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
index 42940f7e2da..a9bb0b21185 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerBuilderFactoryTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.testng.Assert.assertEquals;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.functions.BatchingConfig;
 import org.apache.pulsar.common.functions.CryptoConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
 import org.mockito.internal.util.MockUtil;
@@ -139,6 +141,62 @@ public class ProducerBuilderFactoryTest {
         cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
         
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
         producerConfig.setCryptoConfig(cryptoConfig);
+        BatchingConfig batchingConfig = new BatchingConfig();
+        batchingConfig.setEnabled(true);
+        batchingConfig.setBatchingMaxPublishDelayMs(20);
+        batchingConfig.setBatchingMaxMessages(100);
+        batchingConfig.setBatchingMaxBytes(-1);
+        producerConfig.setBatchingConfig(batchingConfig);
+        ProducerBuilderFactory builderFactory = new 
ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
+        builderFactory.createProducerBuilder("topic", Schema.STRING, 
"producerName");
+
+        verify(pulsarClient).newProducer(Schema.STRING);
+        verify(producerBuilder).blockIfQueueFull(true);
+        // enableBatching will be called twice here:
+        // the first time is called by default to keep the backward compability
+        // the second call is called when the producerConfig and 
producerConfig.batchingConfig are not null
+        verify(producerBuilder, times(2)).enableBatching(true);
+        verify(producerBuilder).batchingMaxPublishDelay(10, 
TimeUnit.MILLISECONDS);
+        verify(producerBuilder).hashingScheme(HashingScheme.Murmur3_32Hash);
+        
verify(producerBuilder).messageRoutingMode(MessageRoutingMode.CustomPartition);
+        verify(producerBuilder).messageRouter(FunctionResultRouter.of());
+        verify(producerBuilder).sendTimeout(0, TimeUnit.SECONDS);
+        verify(producerBuilder).topic("topic");
+        verify(producerBuilder).producerName("producerName");
+
+        verify(producerBuilder).compressionType(CompressionType.SNAPPY);
+        verify(producerBuilder).batcherBuilder(BatcherBuilder.KEY_BASED);
+        verify(producerBuilder).maxPendingMessages(5000);
+        verify(producerBuilder).maxPendingMessagesAcrossPartitions(50000);
+        TestCryptoKeyReader lastInstance = TestCryptoKeyReader.LAST_INSTANCE;
+        assertNotNull(lastInstance);
+        assertEquals(lastInstance.configs, 
cryptoConfig.getCryptoKeyReaderConfig());
+        verify(producerBuilder).cryptoKeyReader(lastInstance);
+        
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
+        verify(producerBuilder).addEncryptionKey("key1");
+        verify(producerBuilder).addEncryptionKey("key2");
+        verify(producerBuilder).batchingMaxPublishDelay(20, 
TimeUnit.MILLISECONDS);
+        verify(producerBuilder).batchingMaxMessages(100);
+        verifyNoMoreInteractions(producerBuilder);
+    }
+
+    @Test
+    public void testCreateProducerBuilderWithBatchingDisabled() {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setBatchBuilder("KEY_BASED");
+        producerConfig.setCompressionType(CompressionType.SNAPPY);
+        producerConfig.setMaxPendingMessages(5000);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(50000);
+        CryptoConfig cryptoConfig = new CryptoConfig();
+        
cryptoConfig.setProducerCryptoFailureAction(ProducerCryptoFailureAction.FAIL);
+        cryptoConfig.setEncryptionKeys(new String[]{"key1", "key2"});
+        cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
+        
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
+        producerConfig.setCryptoConfig(cryptoConfig);
+        BatchingConfig batchingConfig = new BatchingConfig();
+        batchingConfig.setEnabled(false);
+        batchingConfig.setBatchingMaxPublishDelayMs(0);
+        producerConfig.setBatchingConfig(batchingConfig);
         ProducerBuilderFactory builderFactory = new 
ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
         builderFactory.createProducerBuilder("topic", Schema.STRING, 
"producerName");
         verifyCommon();
@@ -153,12 +211,14 @@ public class ProducerBuilderFactoryTest {
         
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
         verify(producerBuilder).addEncryptionKey("key1");
         verify(producerBuilder).addEncryptionKey("key2");
+        verify(producerBuilder).enableBatching(false);
         verifyNoMoreInteractions(producerBuilder);
     }
 
     public static class TestCryptoKeyReader implements CryptoKeyReader {
         static TestCryptoKeyReader LAST_INSTANCE;
         Map<String, Object> configs;
+
         public TestCryptoKeyReader(Map<String, Object> configs) {
             this.configs = configs;
             assert LAST_INSTANCE == null;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index de3f03a3900..5bd1a42b5a3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -121,6 +121,7 @@ message ProducerSpec {
     CryptoSpec cryptoSpec = 4;
     string batchBuilder = 5;
     CompressionType compressionType = 6;
+    BatchingSpec batchingSpec = 7;
 }
 
 message CryptoSpec {
@@ -147,6 +148,15 @@ message CryptoSpec {
     FailureAction consumerCryptoFailureAction = 5;
 }
 
+message BatchingSpec {
+    bool enabled = 1;
+    int32 batchingMaxPublishDelayMs = 2;
+    int32 roundRobinRouterBatchingPartitionSwitchFrequency = 3;
+    int32 batchingMaxMessages = 4;
+    int32 batchingMaxBytes = 5;
+    string batchBuilder = 6;
+}
+
 message SourceSpec {
     string className = 1;
     // map in json format
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
new file mode 100644
index 00000000000..7db61f7ace8
--- /dev/null
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/BatchingUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.functions.proto.Function;
+
+public final class BatchingUtils {
+    public static Function.BatchingSpec convert(BatchingConfig config) {
+        if (config == null) {
+            return null;
+        }
+
+        Function.BatchingSpec.Builder builder = 
Function.BatchingSpec.newBuilder()
+                .setEnabled(config.isEnabled());
+
+        if (config.getBatchingMaxPublishDelayMs() != null && 
config.getBatchingMaxPublishDelayMs() > 0) {
+            
builder.setBatchingMaxPublishDelayMs(config.getBatchingMaxPublishDelayMs());
+        }
+        if (config.getRoundRobinRouterBatchingPartitionSwitchFrequency() != 
null
+                && 
config.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
+            builder.setRoundRobinRouterBatchingPartitionSwitchFrequency(
+                    
config.getRoundRobinRouterBatchingPartitionSwitchFrequency());
+        }
+        if (config.getBatchingMaxMessages() != null && 
config.getBatchingMaxMessages() > 0) {
+            builder.setBatchingMaxMessages(config.getBatchingMaxMessages());
+        }
+        if (config.getBatchingMaxBytes() != null && 
config.getBatchingMaxBytes() > 0) {
+            builder.setBatchingMaxBytes(config.getBatchingMaxBytes());
+        }
+        if (config.getBatchBuilder() != null && 
!config.getBatchBuilder().isEmpty()) {
+            builder.setBatchBuilder(config.getBatchBuilder());
+        }
+
+        return builder.build();
+    }
+
+    public static BatchingConfig convertFromSpec(Function.BatchingSpec spec) {
+        // to keep the backward compatibility, when batchingSpec is null or 
empty
+        // the batching is enabled by default, and the default max publish 
delay is 10ms
+        if (spec == null || spec.toString().equals("")) {
+            return BatchingConfig.builder()
+                    .enabled(true)
+                    .batchingMaxPublishDelayMs(10)
+                    .build();
+        }
+
+        BatchingConfig.BatchingConfigBuilder builder = BatchingConfig.builder()
+                .enabled(spec.getEnabled());
+
+        if (spec.getBatchingMaxPublishDelayMs() > 0) {
+            
builder.batchingMaxPublishDelayMs(spec.getBatchingMaxPublishDelayMs());
+        }
+        if (spec.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
+            builder.roundRobinRouterBatchingPartitionSwitchFrequency(
+                    
spec.getRoundRobinRouterBatchingPartitionSwitchFrequency());
+        }
+        if (spec.getBatchingMaxMessages() > 0) {
+            builder.batchingMaxMessages(spec.getBatchingMaxMessages());
+        }
+        if (spec.getBatchingMaxBytes() > 0) {
+            builder.batchingMaxBytes(spec.getBatchingMaxBytes());
+        }
+        if (spec.getBatchBuilder() != null && 
!spec.getBatchBuilder().isEmpty()) {
+            builder.batchBuilder(spec.getBatchBuilder());
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 45fb4c1cb1e..d19a77e9dea 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -524,6 +524,9 @@ public class FunctionConfigUtils {
         if (producerConf.getBatchBuilder() != null) {
             builder.setBatchBuilder(producerConf.getBatchBuilder());
         }
+        if (producerConf.getBatchingConfig() != null) {
+            
builder.setBatchingSpec(BatchingUtils.convert(producerConf.getBatchingConfig()));
+        }
         if (producerConf.getCompressionType() != null) {
             
builder.setCompressionType(convertFromCompressionType(producerConf.getCompressionType()));
         } else {
@@ -546,6 +549,9 @@ public class FunctionConfigUtils {
         if (spec.getBatchBuilder() != null) {
             producerConfig.setBatchBuilder(spec.getBatchBuilder());
         }
+        if (spec.hasBatchingSpec()) {
+            
producerConfig.setBatchingConfig(BatchingUtils.convertFromSpec(spec.getBatchingSpec()));
+        }
         
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
         
producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType()));
         return producerConfig;
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
new file mode 100644
index 00000000000..655c0d3837e
--- /dev/null
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/BatchingUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import static org.testng.Assert.*;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+public class BatchingUtilsTest {
+
+    @Test
+    public void testConvert() {
+        BatchingConfig config = BatchingConfig.builder()
+                .enabled(true)
+                .batchingMaxPublishDelayMs(30)
+                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
+                .batchingMaxMessages(1000)
+                .batchBuilder("DEFAULT")
+                .build();
+        Function.BatchingSpec spec = BatchingUtils.convert(config);
+        assertEquals(spec.getEnabled(), true);
+        assertEquals(spec.getBatchingMaxPublishDelayMs(), 30);
+        
assertEquals(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency(), 10);
+        assertEquals(spec.getBatchingMaxMessages(), 1000);
+        assertEquals(spec.getBatchingMaxBytes(), 0);
+        assertEquals(spec.getBatchBuilder(), "DEFAULT");
+    }
+
+    @Test
+    public void testConvertFromSpec() {
+        Function.BatchingSpec spec = Function.BatchingSpec.newBuilder()
+                .setEnabled(true)
+                .setBatchingMaxPublishDelayMs(30)
+                .setRoundRobinRouterBatchingPartitionSwitchFrequency(10)
+                .setBatchingMaxMessages(1000)
+                .setBatchBuilder("DEFAULT")
+                .build();
+        BatchingConfig config = BatchingUtils.convertFromSpec(spec);
+        assertEquals(config.isEnabled(), true);
+        assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 30);
+        
assertEquals(config.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue(),
 10);
+        assertEquals(config.getBatchingMaxMessages().intValue(), 1000);
+        assertEquals(config.getBatchingMaxBytes(), null);
+        assertEquals(config.getBatchBuilder(), "DEFAULT");
+    }
+
+    @Test
+    public void testConvertFromSpecFromNull() {
+        BatchingConfig config = BatchingUtils.convertFromSpec(null);
+        assertTrue(config.isEnabled());
+        assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 10);
+    }
+}
\ No newline at end of file
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 694dcba5eaf..ae94ddd1be5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -62,8 +62,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
+import org.apache.pulsar.common.functions.BatchingConfig;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
@@ -695,7 +697,18 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                            boolean pyZip,
                                            boolean multipleInput,
                                            boolean withExtraDeps,
-                                           
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws 
Exception {
+                                           ProducerConfig producerConfig) 
throws Exception {
+        testExclamationFunction(runtime, isTopicPattern, pyZip, multipleInput, 
withExtraDeps, producerConfig, null);
+    }
+
+    protected void testExclamationFunction(Runtime runtime,
+                                           boolean isTopicPattern,
+                                           boolean pyZip,
+                                           boolean multipleInput,
+                                           boolean withExtraDeps,
+                                           ProducerConfig producerConfig,
+                                           
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+            throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && (runtime == 
Runtime.PYTHON || runtime == Runtime.GO)) {
             // python&go can only run on process mode
             return;
@@ -725,10 +738,21 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         // submit the exclamation function
         submitExclamationFunction(
-                runtime, inputTopicName, outputTopicName, functionName, pyZip, 
withExtraDeps, schema, commandGeneratorConsumer);
+                runtime, inputTopicName, outputTopicName, functionName, pyZip, 
withExtraDeps, schema,
+                commandGeneratorConsumer);
 
         // get function info
         final String info = getFunctionInfoSuccess(functionName);
+        FunctionConfig config = 
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, 
FunctionConfig.class);
+
+        // check batching config
+        if (runtime == Runtime.JAVA) {
+            BatchingConfig batchingConfig = null;
+            if (producerConfig != null && producerConfig.getBatchingConfig() 
!= null) {
+                batchingConfig = producerConfig.getBatchingConfig();
+            }
+            checkBatchingConfig(functionName, batchingConfig, config);
+        }
 
         // get function stats
         getFunctionStatsEmpty(functionName);
@@ -770,8 +794,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                 break;
         }
 
-        checkSubscriptionType(inputTopicName,
-                
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, 
FunctionConfig.class));
+        // check subscription type
+        checkSubscriptionType(inputTopicName, config);
 
         // delete function
         deleteFunction(functionName);
@@ -819,6 +843,26 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         });
     }
 
+    // checking batching config, we can only check this by checking the logs 
for now
+    private void checkBatchingConfig(String functionName, BatchingConfig 
config, FunctionConfig functionConfig) {
+        if (config != null) {
+            assertNotNull(functionConfig.getProducerConfig());
+            
assertNotNull(functionConfig.getProducerConfig().getBatchingConfig());
+            assertEquals(config.toString(), 
functionConfig.getProducerConfig().getBatchingConfig().toString());
+        }
+
+        String functionLogs = pulsarCluster.getFunctionLogs(functionName);
+        if (config == null || config.isEnabled()) {
+            BatchingConfig finalConfig = config;
+            if (finalConfig == null) {
+                finalConfig = BatchingConfig.builder().build();
+            }
+            assertTrue(functionLogs.contains(finalConfig.toString()));
+        } else {
+            assertTrue(functionLogs.contains("BatchingConfig(enabled=false"));
+        }
+    }
+
     private void submitExclamationFunction(Runtime runtime,
                                            String inputTopicName,
                                            String outputTopicName,
@@ -837,7 +881,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                            boolean pyZip,
                                            boolean withExtraDeps,
                                            Schema<?> schema,
-                                           
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws 
Exception {
+                                           
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+            throws Exception {
         submitFunction(
                 runtime,
                 inputTopicName,
@@ -860,7 +905,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                     boolean isPublishFunction,
                                     String functionClass,
                                     Schema<T> inputTopicSchema,
-                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws 
Exception {
+                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+            throws Exception {
 
         String file = null;
         if (Runtime.JAVA == runtime) {
@@ -894,7 +940,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                     String functionFile,
                                     String functionClass,
                                     Schema<T> inputTopicSchema,
-                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws 
Exception {
+                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+            throws Exception {
         submitFunction(runtime, inputTopicName, outputTopicName, functionName, 
functionFile, functionClass,
                 inputTopicSchema, null, null, null, null, null, null,
                 commandGeneratorConsumer);
@@ -913,7 +960,8 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                     SubscriptionInitialPosition 
subscriptionInitialPosition,
                                     String inputTypeClassName,
                                     String outputTypeClassName,
-                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer) throws 
Exception {
+                                    
java.util.function.Consumer<CommandGenerator> commandGeneratorConsumer)
+            throws Exception {
 
         if (StringUtils.isNotEmpty(inputTopicName)) {
             ensureSubscriptionCreated(
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 939d6e19d1f..fc9ffe9da0d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.collections4.map.HashedMap;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -109,6 +111,35 @@ public abstract class PulsarFunctionsJavaTest extends 
PulsarFunctionsTest {
         testExclamationFunction(Runtime.JAVA, true, false, false, false);
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testJavaExclamationCustomBatchingFunction() throws Exception {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setBatchingConfig(BatchingConfig.builder()
+                .enabled(true)
+                .batchingMaxPublishDelayMs(5)
+                .batchingMaxMessages(100)
+                .batchingMaxBytes(64 * 1024)
+                .roundRobinRouterBatchingPartitionSwitchFrequency(5)
+                .batchBuilder("KEY_BASED")
+                .build());
+        testExclamationFunction(Runtime.JAVA, false, false, false, false,
+                producerConfig, commandGenerator -> {
+                    commandGenerator.setProducerConfig(producerConfig);
+                });
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testJavaExclamationDiableBatchingFunction() throws Exception {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setBatchingConfig(BatchingConfig.builder()
+                .enabled(false)
+                .build());
+        testExclamationFunction(Runtime.JAVA, false, false, false, false,
+                producerConfig, commandGenerator -> {
+                    commandGenerator.setProducerConfig(producerConfig);
+                });
+    }
+
     @Test(groups = {"java_function", "function"})
     public void testJavaLoggingFunction() throws Exception {
         testLoggingFunction(Runtime.JAVA);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
index 9ba210b9988..05441f0111b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/python/PulsarFunctionsPythonTest.java
@@ -72,7 +72,7 @@ public abstract class PulsarFunctionsPythonTest extends 
PulsarFunctionsTest {
     @Test(groups = {"python_function", "function"})
     public void testRetainOrderingTest() throws Exception {
         testExclamationFunction(Runtime.PYTHON, false, false, false,
-                false, generator -> {
+                false, null, generator -> {
                     generator.setRetainOrdering(true);
                 });
     }
@@ -80,7 +80,7 @@ public abstract class PulsarFunctionsPythonTest extends 
PulsarFunctionsTest {
     @Test(groups = {"python_function", "function"})
     public void testRetainKeyOrderingTest() throws Exception {
         testExclamationFunction(Runtime.PYTHON, false, false, false,
-                false, generator -> {
+                false, null, generator -> {
                     System.out.println("calling 
generator.setRetainKeyOrdering(true);");
                     generator.setRetainKeyOrdering(true);
                 });
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index e0fbd604007..08765b5e535 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -26,6 +26,7 @@ import lombok.Setter;
 import lombok.ToString;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
 @Getter
@@ -66,6 +67,7 @@ public class CommandGenerator {
     private SubscriptionInitialPosition subscriptionInitialPosition;
     private Boolean retainOrdering;
     private Boolean retainKeyOrdering;
+    private ProducerConfig producerConfig;
 
     private Map<String, String> userConfig = new HashMap<>();
     public static final String JAVAJAR = 
"/pulsar/examples/java-test-functions.jar";
@@ -256,6 +258,9 @@ public class CommandGenerator {
                 }
                 break;
         }
+        if (producerConfig != null) {
+            commandBuilder.append(" --producer-config \'" + new 
Gson().toJson(producerConfig) + "\'");
+        }
         return commandBuilder.toString();
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
new file mode 100644
index 00000000000..79981fce2b4
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/DataGeneratorSourceTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sources;
+
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.BatchingConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+import static 
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * This tests verifies that a batch source can be successfully submitted and 
run via the pulsar-admin CLI
+ */
+@Slf4j
+public class DataGeneratorSourceTest extends PulsarStandaloneTestSuite {
+
+    @Test(groups = {"source"})
+    public void testSource() throws Exception {
+        testGenericRecordSource(null);
+    }
+
+    @Test(groups = {"source"})
+    public void testSourceCustomBatching() throws Exception {
+        BatchingConfig config = BatchingConfig.builder()
+                .enabled(true)
+                .batchingMaxPublishDelayMs(5)
+                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
+                .batchingMaxMessages(10)
+                .batchingMaxBytes(32 * 1024)
+                .batchBuilder("KEY_BASED")
+                .build();
+        testGenericRecordSource(config);
+    }
+
+    @Test(groups = {"source"})
+    public void testSourceDisableBatching() throws Exception {
+        BatchingConfig config = BatchingConfig.builder()
+                .enabled(false)
+                .build();
+        testGenericRecordSource(config);
+    }
+
+    public void testGenericRecordSource(BatchingConfig config) throws 
Exception {
+        String outputTopicName = "test-state-source-output-" + randomName(8);
+        String sourceName = "test-state-source-" + randomName(8);
+        int numMessages = 10;
+        try {
+            ProducerConfig producerConfig = null;
+            if (config != null) {
+                producerConfig = ProducerConfig.builder()
+                        .batchingConfig(config)
+                        .build();
+            }
+            submitSourceConnector(
+                    sourceName,
+                    outputTopicName,
+                    "builtin://data-generator",
+                    producerConfig);
+
+            // get source info
+            String info = getSourceInfoSuccess(container, sourceName);
+            SourceConfig sourceConfig =
+                    
ObjectMapperFactory.getMapper().getObjectMapper().readValue(info, 
SourceConfig.class);
+            // checking batching config is applied
+            checkBatchingConfig(sourceName, config, sourceConfig);
+
+            // get source status
+            getSourceStatus(container, sourceName);
+
+            try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+                retryStrategically((test) -> {
+                    try {
+                        SourceStatus status = 
admin.sources().getSourceStatus("public", "default", sourceName);
+                        return status.getInstances().size() > 0
+                                && 
status.getInstances().get(0).getStatus().numWritten >= 10;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 10, 200);
+
+                SourceStatus status = 
admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten 
>= 10);
+            }
+
+            // delete source
+            deleteSource(container, sourceName);
+
+            getSourceInfoNotFound(container, sourceName);
+        } finally {
+            dumpFunctionLogs(sourceName);
+        }
+    }
+
+    private void submitSourceConnector(String sourceName,
+                                       String outputTopicName,
+                                       String archive,
+                                       ProducerConfig producerConfig) throws 
Exception {
+        List<String> commands = new ArrayList<>(List.of(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources", "create",
+                "--name", sourceName,
+                "--destinationTopicName", outputTopicName,
+                "--archive", archive
+        ));
+        if (producerConfig != null) {
+            commands.add("--producer-config");
+            commands.add(new Gson().toJson(producerConfig));
+        }
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands.toArray(new 
String[0]));
+        assertTrue(
+                result.getStdout().contains("Created successfully"),
+                result.getStdout());
+    }
+
+    private static String getSourceInfoSuccess(StandaloneContainer container, 
String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + 
"\""));
+        return result.getStdout();
+    }
+
+    private static void getSourceStatus(StandaloneContainer container, String 
sourceName) throws Exception {
+        retryStrategically((test) -> {
+            try {
+                ContainerExecResult result = container.execCmd(
+                        PulsarCluster.ADMIN_SCRIPT,
+                        "sources",
+                        "status",
+                        "--tenant", "public",
+                        "--namespace", "default",
+                        "--name", sourceName);
+
+                if (result.getStdout().contains("\"running\" : true")) {
+                    return true;
+                }
+                return false;
+            } catch (Exception e) {
+                log.error("Encountered error when getting source status", e);
+                return false;
+            }
+        }, 10, 200);
+
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName);
+
+        Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
+    // checking batching config, we can only check this by checking the logs 
for now
+    private void checkBatchingConfig(String functionName, BatchingConfig 
config, SourceConfig sourceConfig) {
+        if (config != null) {
+            assertNotNull(sourceConfig.getProducerConfig());
+            
assertNotNull(sourceConfig.getProducerConfig().getBatchingConfig());
+            assertEquals(config.toString(), 
sourceConfig.getProducerConfig().getBatchingConfig().toString());
+        }
+
+        String functionLogs = getFunctionLogs(functionName);
+        if (config == null || config.isEnabled()) {
+            BatchingConfig finalConfig = config;
+            if (finalConfig == null) {
+                finalConfig = BatchingConfig.builder().build();
+            }
+            assertTrue(functionLogs.contains(finalConfig.toString()));
+        } else {
+            assertTrue(functionLogs.contains("BatchingConfig(enabled=false"));
+        }
+    }
+
+    private static void deleteSource(StandaloneContainer container, String 
sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("Delete source successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+    private static void getSourceInfoNotFound(StandaloneContainer container, 
String sourceName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sources",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", sourceName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + 
sourceName + " doesn't exist"));
+        }
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 35fb453c4bb..6e3c4530f5d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -720,6 +720,21 @@ public class PulsarCluster {
                 enabled ? "--enable" : "--disable");
     }
 
+    public String getFunctionLogs(String name) {
+        StringBuilder logs = new StringBuilder();
+        for (WorkerContainer container : getAlWorkers()) {
+            try {
+                String logFile = "/pulsar/logs/functions/public/default/" + 
name + "/" + name + "-0.log";
+                logs.append(container.<String>copyFileFromContainer(logFile, 
(inputStream) -> {
+                    return IOUtils.toString(inputStream, "utf-8");
+                }));
+            } catch (Exception e) {
+                log.error("Failed to get function logs from container {}", 
container.getContainerName(), e);
+            }
+        }
+        return logs.toString();
+    }
+
     public void dumpFunctionLogs(String name) {
         for (WorkerContainer container : getAlWorkers()) {
             log.info("Trying to get function {} logs from container {}", name, 
container.getContainerName());
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
index 411b5217501..23a17a1b896 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarStandaloneTestBase.java
@@ -107,7 +107,17 @@ public abstract class PulsarStandaloneTestBase extends 
PulsarTestBase {
         }
     }
 
-
+    protected String getFunctionLogs(String name) {
+        try {
+            String logFile = "/pulsar/logs/functions/public/default/" + name + 
"/" + name + "-0.log";
+            return container.<String>copyFileFromContainer(logFile, 
(inputStream) -> {
+                return IOUtils.toString(inputStream, "utf-8");
+            });
+        } catch (Throwable err) {
+            log.info("Cannot get {} logs", name, err);
+            return "";
+        }
+    }
 
     protected void dumpFunctionLogs(String name) {
         try {
diff --git a/tests/integration/src/test/resources/pulsar-io-sources.xml 
b/tests/integration/src/test/resources/pulsar-io-sources.xml
index 636b3e47919..3d3b150f664 100644
--- a/tests/integration/src/test/resources/pulsar-io-sources.xml
+++ b/tests/integration/src/test/resources/pulsar-io-sources.xml
@@ -24,6 +24,7 @@
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.io.sources.debezium.PulsarDebeziumSourcesTest"
 />
             <class 
name="org.apache.pulsar.tests.integration.io.sources.BatchSourceTest" />
+            <class 
name="org.apache.pulsar.tests.integration.io.sources.DataGeneratorSourceTest" />
             <class 
name="org.apache.pulsar.tests.integration.io.sources.AvroKafkaSourceTest" />
         </classes>
     </test>

Reply via email to