This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f1c8684fe52 [fix][io] Allow setting sourceType in config file (#19836)
f1c8684fe52 is described below
commit f1c8684fe52d9f75189649eecb34c35e59441590
Author: Alexander Preuß <[email protected]>
AuthorDate: Thu Aug 31 07:54:17 2023 +0200
[fix][io] Allow setting sourceType in config file (#19836)
Signed-off-by: tison <[email protected]>
Co-authored-by: Alexander Preuß <[email protected]>
Co-authored-by: tison <[email protected]>
---
.../org/apache/pulsar/common/io/SourceConfig.java | 1 +
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdSources.java | 4 ++-
.../apache/pulsar/admin/cli/TestCmdSources.java | 33 +++++++++++++---------
4 files changed, 24 insertions(+), 16 deletions(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 17b37008127..251e0bf810b 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -59,6 +59,7 @@ public class SourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
private Resources resources;
+ private String sourceType;
private String archive;
// Any flags that you want to pass to the runtime.
private String runtimeFlags;
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 6d9619244a3..35dec576541 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -524,7 +524,7 @@ public class CmdSinks extends CmdBase {
sinkConfig.setParallelism(parallelism);
}
- if (archive != null && sinkType != null) {
+ if (archive != null && (sinkType != null ||
sinkConfig.getSinkType() != null)) {
throw new ParameterException("Cannot specify both archive and
sink-type");
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index be2e0320602..ac6ff5e6845 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -437,7 +437,7 @@ public class CmdSources extends CmdBase {
sourceConfig.setParallelism(parallelism);
}
- if (archive != null && sourceType != null) {
+ if (archive != null && (sourceType != null ||
sourceConfig.getSourceType() != null)) {
throw new ParameterException("Cannot specify both archive and
source-type");
}
@@ -447,6 +447,8 @@ public class CmdSources extends CmdBase {
if (sourceType != null) {
sourceConfig.setArchive(validateSourceType(sourceType));
+ } else if (sourceConfig.getSourceType() != null) {
+
sourceConfig.setArchive(validateSourceType(sourceConfig.getSourceType()));
}
Resources resources = sourceConfig.getResources();
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 7d9a0a1f50c..c888ae6c608 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -18,15 +18,6 @@
*/
package org.apache.pulsar.admin.cli;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertTrue;
-
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
@@ -35,7 +26,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
-
import java.util.UUID;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -51,10 +41,18 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
public class TestCmdSources {
-
- private static final String TENANT = "test-tenant";
+ private static final String TENANT = "test-tenant";
private static final String NAMESPACE = "test-namespace";
private static final String NAME = "test";
private static final String TOPIC_NAME = "src_topic_1";
@@ -84,7 +82,6 @@ public class TestCmdSources {
@BeforeMethod
public void setup() throws Exception {
-
pulsarAdmin = mock(PulsarAdmin.class);
source = mock(Sources.class);
when(pulsarAdmin.sources()).thenReturn(source);
@@ -414,7 +411,15 @@ public class TestCmdSources {
expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}
-
+
+ @Test
+ public void testCmdSourceConfigFileInvalidSourceType() throws Exception {
+ SourceConfig sourceConfig = getSourceConfig();
+ sourceConfig.setSourceType("foo");
+ assertThatThrownBy(() -> testCmdSourceConfigFile(sourceConfig, null))
+ .hasMessageContaining("Invalid source type 'foo'");
+ }
+
public void testCmdSourceConfigFile(SourceConfig testSourceConfig,
SourceConfig expectedSourceConfig) throws Exception {
File file = Files.createTempFile("", "").toFile();