This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b2f951cf0553eb3d50f45172e231a7929bd50022 Author: Fangbin Sun <[email protected]> AuthorDate: Tue Jun 29 20:36:58 2021 +0800 [Issue 10579] Fix inputs to return a list of topic (#11094) Fixes #10579 (cherry picked from commit 0ee39562239d4e05c2a802afc32e0efe73574df4) --- .../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java | 4 ++++ .../java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 1 + .../pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java | 6 ++++++ 3 files changed, 11 insertions(+) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 2d6972c..fd6f35c 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -46,6 +46,7 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils; import java.io.IOException; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -250,6 +251,7 @@ public class SinkConfigUtils { sinkConfig.setParallelism(functionDetails.getParallelism()); sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees())); Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>(); + List<String> inputs = new ArrayList<>(); for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) { ConsumerConfig consumerConfig = new ConsumerConfig(); if (!isEmpty(input.getValue().getSerdeClassName())) { @@ -267,7 +269,9 @@ public class SinkConfigUtils { consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern()); consumerConfig.setConsumerProperties(input.getValue().getConsumerPropertiesMap()); consumerConfigMap.put(input.getKey(), consumerConfig); + inputs.add(input.getKey()); } + sinkConfig.setInputs(inputs); sinkConfig.setInputSpecs(consumerConfigMap); if (!isEmpty(functionDetails.getSource().getSubscriptionName())) { sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName()); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index 8c5620e..a4ef586 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -83,6 +83,7 @@ public class SinkConfigUtilsTest extends PowerMockTestCase { sinkConfig.setSourceSubscriptionName("test-subscription"); Map<String, ConsumerConfig> inputSpecs = new HashMap<>(); inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build()); + sinkConfig.setInputs(Collections.singleton("test-input")); sinkConfig.setInputSpecs(inputSpecs); sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index b593578..40ac38e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -33,6 +33,7 @@ import static org.powermock.api.mockito.PowerMockito.doReturn; import static org.powermock.api.mockito.PowerMockito.doThrow; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import com.google.common.collect.Lists; import java.io.File; import java.io.FileInputStream; @@ -41,6 +42,7 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -1461,6 +1463,10 @@ public class SinkApiV3ResourceTest { when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData); getDefaultSinkInfo(); + + assertNotNull(getDefaultSinkInfo().getInputs()); + assertEquals(getDefaultSinkInfo().getInputs(), Collections.singleton("input")); + assertEquals( SinkConfigUtils.convertFromDetails(functionDetails), getDefaultSinkInfo());
