This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 95a7a09da2e [fix][fn] Fix ProducerConfig cannot update error (#21037)
95a7a09da2e is described below
commit 95a7a09da2eac56408488c89b9ec76634a19033f
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Aug 31 17:13:09 2023 +0800
[fix][fn] Fix ProducerConfig cannot update error (#21037)
---
.../functions/utils/FunctionConfigUtils.java | 3 +++
.../pulsar/functions/utils/SourceConfigUtils.java | 3 +++
.../functions/utils/FunctionConfigUtilsTest.java | 24 ++++++++++++++++++++++
.../functions/utils/SourceConfigUtilsTest.java | 24 ++++++++++++++++++++++
4 files changed, 54 insertions(+)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 8e95e0fecf9..769621da402 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -1088,6 +1088,9 @@ public class FunctionConfigUtils {
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
+ if (newConfig.getProducerConfig() != null) {
+ mergedConfig.setProducerConfig(newConfig.getProducerConfig());
+ }
return mergedConfig;
}
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index ec0c5c444cc..f3be015d737 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -448,6 +448,9 @@ public class SourceConfigUtils {
validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(),
newConfig.getBatchSourceConfig());
mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
}
+ if (newConfig.getProducerConfig() != null) {
+ mergedConfig.setProducerConfig(newConfig.getProducerConfig());
+ }
return mergedConfig;
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 8b4470d8c76..ef4e72dc8d0 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -429,6 +429,30 @@ public class FunctionConfigUtilsTest {
);
}
+ @Test
+ public void testMergeDifferentProducerConfig() {
+ FunctionConfig functionConfig = createFunctionConfig();
+
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setMaxPendingMessages(100);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+ producerConfig.setUseThreadLocalProducers(true);
+ producerConfig.setBatchBuilder("DEFAULT");
+ producerConfig.setCompressionType(CompressionType.ZLIB);
+ FunctionConfig newFunctionConfig =
createUpdatedFunctionConfig("producerConfig", producerConfig);
+
+ FunctionConfig mergedConfig =
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ assertEquals(
+ mergedConfig.getProducerConfig(),
+ producerConfig
+ );
+ mergedConfig.setProducerConfig(functionConfig.getProducerConfig());
+ assertEquals(
+ new Gson().toJson(functionConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
@Test
public void testMergeDifferentTimeout() {
FunctionConfig functionConfig = createFunctionConfig();
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 49313dbf02c..a4da4203d96 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -285,6 +285,30 @@ public class SourceConfigUtilsTest {
);
}
+ @Test
+ public void testMergeDifferentProducerConfig() {
+ SourceConfig sourceConfig = createSourceConfig();
+
+ ProducerConfig producerConfig = new ProducerConfig();
+ producerConfig.setMaxPendingMessages(100);
+ producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+ producerConfig.setUseThreadLocalProducers(true);
+ producerConfig.setBatchBuilder("DEFAULT");
+ producerConfig.setCompressionType(CompressionType.ZLIB);
+ SourceConfig newSourceConfig =
createUpdatedSourceConfig("producerConfig", producerConfig);
+
+ SourceConfig mergedConfig =
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
+ assertEquals(
+ mergedConfig.getProducerConfig(),
+ producerConfig
+ );
+ mergedConfig.setProducerConfig(sourceConfig.getProducerConfig());
+ assertEquals(
+ new Gson().toJson(sourceConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
@Test
public void testValidateConfig() {
SourceConfig sourceConfig = createSourceConfig();