This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3f3e0b5c83 [fix][admin] Keep new inputSpecs when updating sink 
configs (#19082)
b3f3e0b5c83 is described below

commit b3f3e0b5c83b29578f6a4b56c8efebb8ab50ae76
Author: Ayman Khalil <[email protected]>
AuthorDate: Mon Jan 2 01:18:43 2023 -0800

    [fix][admin] Keep new inputSpecs when updating sink configs (#19082)
---
 .../pulsar/functions/utils/SinkConfigUtils.java       |  2 +-
 .../pulsar/functions/utils/SinkConfigUtilsTest.java   | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 8ef5665af2e..df943f93b83 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -586,7 +586,7 @@ public class SinkConfigUtils {
 
         if (newConfig.getInputs() != null) {
             newConfig.getInputs().forEach((topicName -> {
-                newConfig.getInputSpecs().put(topicName,
+                newConfig.getInputSpecs().putIfAbsent(topicName,
                         
ConsumerConfig.builder().isRegexPattern(false).build());
             }));
         }
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 676f452a843..62d6f68d31b 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.utils;
 
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
+import java.util.ArrayList;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
@@ -277,6 +278,24 @@ public class SinkConfigUtilsTest {
         
assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(),
 1000);
     }
 
+    @Test
+    public void testMergeDifferentInputSpecWithInputsSet() {
+        SinkConfig sinkConfig = createSinkConfig();
+        sinkConfig.getInputSpecs().put("test-input", 
ConsumerConfig.builder().isRegexPattern(false).receiverQueueSize(1000).build());
+
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        ConsumerConfig newConsumerConfig = 
ConsumerConfig.builder().isRegexPattern(false).serdeClassName("test-serde").receiverQueueSize(58).build();
+        inputSpecs.put("test-input", newConsumerConfig);
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", 
inputSpecs);
+        newSinkConfig.setInputs(new ArrayList<>());
+        newSinkConfig.getInputs().add("test-input");
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(mergedConfig.getInputSpecs().get("test-input"), 
newConsumerConfig);
+
+        // make sure original sinkConfig was not modified
+        
assertEquals(sinkConfig.getInputSpecs().get("test-input").getReceiverQueueSize().intValue(),
 1000);
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, 
expectedExceptionsMessageRegExp = "Processing Guarantees cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SinkConfig sinkConfig = createSinkConfig();

Reply via email to