This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b3f3e0b5c83 [fix][admin] Keep new inputSpecs when updating sink
configs (#19082)
b3f3e0b5c83 is described below
commit b3f3e0b5c83b29578f6a4b56c8efebb8ab50ae76
Author: Ayman Khalil <[email protected]>
AuthorDate: Mon Jan 2 01:18:43 2023 -0800
[fix][admin] Keep new inputSpecs when updating sink configs (#19082)
---
.../pulsar/functions/utils/SinkConfigUtils.java | 2 +-
.../pulsar/functions/utils/SinkConfigUtilsTest.java | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 8ef5665af2e..df943f93b83 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -586,7 +586,7 @@ public class SinkConfigUtils {
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
- newConfig.getInputSpecs().put(topicName,
+ newConfig.getInputSpecs().putIfAbsent(topicName,
ConsumerConfig.builder().isRegexPattern(false).build());
}));
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 676f452a843..62d6f68d31b 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.utils;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
+import java.util.ArrayList;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
@@ -277,6 +278,24 @@ public class SinkConfigUtilsTest {
assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(),
1000);
}
+ @Test
+ public void testMergeDifferentInputSpecWithInputsSet() {
+ SinkConfig sinkConfig = createSinkConfig();
+ sinkConfig.getInputSpecs().put("test-input",
ConsumerConfig.builder().isRegexPattern(false).receiverQueueSize(1000).build());
+
+ Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+ ConsumerConfig newConsumerConfig =
ConsumerConfig.builder().isRegexPattern(false).serdeClassName("test-serde").receiverQueueSize(58).build();
+ inputSpecs.put("test-input", newConsumerConfig);
+ SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs",
inputSpecs);
+ newSinkConfig.setInputs(new ArrayList<>());
+ newSinkConfig.getInputs().add("test-input");
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig,
newSinkConfig);
+ assertEquals(mergedConfig.getInputSpecs().get("test-input"),
newConsumerConfig);
+
+ // make sure original sinkConfig was not modified
+
assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(),
1000);
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
SinkConfig sinkConfig = createSinkConfig();