This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 95a7a09da2e [fix][fn] Fix ProducerConfig cannot update error (#21037)
95a7a09da2e is described below

commit 95a7a09da2eac56408488c89b9ec76634a19033f
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Aug 31 17:13:09 2023 +0800

    [fix][fn] Fix ProducerConfig cannot update error (#21037)
---
 .../functions/utils/FunctionConfigUtils.java       |  3 +++
 .../pulsar/functions/utils/SourceConfigUtils.java  |  3 +++
 .../functions/utils/FunctionConfigUtilsTest.java   | 24 ++++++++++++++++++++++
 .../functions/utils/SourceConfigUtilsTest.java     | 24 ++++++++++++++++++++++
 4 files changed, 54 insertions(+)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 8e95e0fecf9..769621da402 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -1088,6 +1088,9 @@ public class FunctionConfigUtils {
         if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
             
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
         }
+        if (newConfig.getProducerConfig() != null) {
+            mergedConfig.setProducerConfig(newConfig.getProducerConfig());
+        }
         return mergedConfig;
     }
 }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index ec0c5c444cc..f3be015d737 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -448,6 +448,9 @@ public class SourceConfigUtils {
             
validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(), 
newConfig.getBatchSourceConfig());
             
mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
         }
+        if (newConfig.getProducerConfig() != null) {
+            mergedConfig.setProducerConfig(newConfig.getProducerConfig());
+        }
         return mergedConfig;
     }
 
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 8b4470d8c76..ef4e72dc8d0 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -429,6 +429,30 @@ public class FunctionConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testMergeDifferentProducerConfig() {
+        FunctionConfig functionConfig = createFunctionConfig();
+
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setMaxPendingMessages(100);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
+        producerConfig.setCompressionType(CompressionType.ZLIB);
+        FunctionConfig newFunctionConfig = 
createUpdatedFunctionConfig("producerConfig", producerConfig);
+
+        FunctionConfig mergedConfig = 
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+        assertEquals(
+                mergedConfig.getProducerConfig(),
+                producerConfig
+        );
+        mergedConfig.setProducerConfig(functionConfig.getProducerConfig());
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
     @Test
     public void testMergeDifferentTimeout() {
         FunctionConfig functionConfig = createFunctionConfig();
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 49313dbf02c..a4da4203d96 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -285,6 +285,30 @@ public class SourceConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testMergeDifferentProducerConfig() {
+        SourceConfig sourceConfig = createSourceConfig();
+
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setMaxPendingMessages(100);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
+        producerConfig.setCompressionType(CompressionType.ZLIB);
+        SourceConfig newSourceConfig = 
createUpdatedSourceConfig("producerConfig", producerConfig);
+
+        SourceConfig mergedConfig = 
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+        assertEquals(
+                mergedConfig.getProducerConfig(),
+                producerConfig
+        );
+        mergedConfig.setProducerConfig(sourceConfig.getProducerConfig());
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
     @Test
     public void testValidateConfig() {
         SourceConfig sourceConfig = createSourceConfig();

Reply via email to