This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b2f951cf0553eb3d50f45172e231a7929bd50022
Author: Fangbin Sun <[email protected]>
AuthorDate: Tue Jun 29 20:36:58 2021 +0800

    [Issue 10579] Fix inputs to return a list of topic (#11094)
    
    Fixes #10579
    
    (cherry picked from commit 0ee39562239d4e05c2a802afc32e0efe73574df4)
---
 .../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java     | 4 ++++
 .../java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 1 +
 .../pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java  | 6 ++++++
 3 files changed, 11 insertions(+)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 2d6972c..fd6f35c 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -250,6 +251,7 @@ public class SinkConfigUtils {
         sinkConfig.setParallelism(functionDetails.getParallelism());
         
sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+        List<String> inputs = new ArrayList<>();
         for (Map.Entry<String, Function.ConsumerSpec> input : 
functionDetails.getSource().getInputSpecsMap().entrySet()) {
             ConsumerConfig consumerConfig = new ConsumerConfig();
             if (!isEmpty(input.getValue().getSerdeClassName())) {
@@ -267,7 +269,9 @@ public class SinkConfigUtils {
             
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
             
consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap());
             consumerConfigMap.put(input.getKey(), consumerConfig);
+            inputs.add(input.getKey());
         }
+        sinkConfig.setInputs(inputs);
         sinkConfig.setInputSpecs(consumerConfigMap);
         if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
             
sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 8c5620e..a4ef586 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -83,6 +83,7 @@ public class SinkConfigUtilsTest extends PowerMockTestCase {
         sinkConfig.setSourceSubscriptionName("test-subscription");
         Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
         inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build());
+        sinkConfig.setInputs(Collections.singleton("test-input"));
         sinkConfig.setInputSpecs(inputSpecs);
         
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index b593578..40ac38e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -33,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,6 +42,7 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -1461,6 +1463,10 @@ public class SinkApiV3ResourceTest {
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(metaData);
 
         getDefaultSinkInfo();
+
+        assertNotNull(getDefaultSinkInfo().getInputs());
+        assertEquals(getDefaultSinkInfo().getInputs(), 
Collections.singleton("input"));
+
         assertEquals(
                 SinkConfigUtils.convertFromDetails(functionDetails),
                 getDefaultSinkInfo());

Reply via email to