This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c8684fe52 [fix][io] Allow setting sourceType in config file (#19836)
f1c8684fe52 is described below

commit f1c8684fe52d9f75189649eecb34c35e59441590
Author: Alexander Preuß <[email protected]>
AuthorDate: Thu Aug 31 07:54:17 2023 +0200

    [fix][io] Allow setting sourceType in config file (#19836)
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: Alexander Preuß <[email protected]>
    Co-authored-by: tison <[email protected]>
---
 .../org/apache/pulsar/common/io/SourceConfig.java  |  1 +
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  2 +-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  4 ++-
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 33 +++++++++++++---------
 4 files changed, 24 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 17b37008127..251e0bf810b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -59,6 +59,7 @@ public class SourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private Resources resources;
 
+    private String sourceType;
     private String archive;
     // Any flags that you want to pass to the runtime.
     private String runtimeFlags;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 6d9619244a3..35dec576541 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -524,7 +524,7 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setParallelism(parallelism);
             }
 
-            if (archive != null && sinkType != null) {
+            if (archive != null && (sinkType != null || 
sinkConfig.getSinkType() != null)) {
                 throw new ParameterException("Cannot specify both archive and 
sink-type");
             }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index be2e0320602..ac6ff5e6845 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -437,7 +437,7 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setParallelism(parallelism);
             }
 
-            if (archive != null && sourceType != null) {
+            if (archive != null && (sourceType != null || 
sourceConfig.getSourceType() != null)) {
                 throw new ParameterException("Cannot specify both archive and 
source-type");
             }
 
@@ -447,6 +447,8 @@ public class CmdSources extends CmdBase {
 
             if (sourceType != null) {
                 sourceConfig.setArchive(validateSourceType(sourceType));
+            } else if (sourceConfig.getSourceType() != null) {
+                
sourceConfig.setArchive(validateSourceType(sourceConfig.getSourceType()));
             }
 
             Resources resources = sourceConfig.getResources();
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 7d9a0a1f50c..c888ae6c608 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -18,15 +18,6 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertTrue;
-
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
@@ -35,7 +26,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Map;
-
 import java.util.UUID;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -51,10 +41,18 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 
 public class TestCmdSources {
-
-            private static final String TENANT = "test-tenant";
+    private static final String TENANT = "test-tenant";
     private static final String NAMESPACE = "test-namespace";
     private static final String NAME = "test";
     private static final String TOPIC_NAME = "src_topic_1";
@@ -84,7 +82,6 @@ public class TestCmdSources {
 
     @BeforeMethod
     public void setup() throws Exception {
-
         pulsarAdmin = mock(PulsarAdmin.class);
         source = mock(Sources.class);
         when(pulsarAdmin.sources()).thenReturn(source);
@@ -414,7 +411,15 @@ public class TestCmdSources {
         expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
-    
+
+    @Test
+    public void testCmdSourceConfigFileInvalidSourceType() throws Exception {
+        SourceConfig sourceConfig = getSourceConfig();
+        sourceConfig.setSourceType("foo");
+        assertThatThrownBy(() -> testCmdSourceConfigFile(sourceConfig, null))
+                .hasMessageContaining("Invalid source type 'foo'");
+    }
+
     public void testCmdSourceConfigFile(SourceConfig testSourceConfig, 
SourceConfig expectedSourceConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();

Reply via email to