This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8cd98c Fix update cli source sink (#4061)
a8cd98c is described below
commit a8cd98cd87dde7fa17ea38ff212eebd14fc95835
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Apr 17 17:42:38 2019 -0700
Fix update cli source sink (#4061)
* fix bug in source and sink cli update
* fix import
* fix logic
* fix tests
---
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 7 +++-
.../org/apache/pulsar/admin/cli/CmdSources.java | 8 ++++-
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 39 ++++++++++++++++++++--
.../apache/pulsar/admin/cli/TestCmdSources.java | 36 ++++++++++++++++++++
.../pulsar/common/functions/FunctionConfig.java | 2 +-
.../org/apache/pulsar/common/io/SinkConfig.java | 2 +-
.../functions/utils/FunctionConfigUtils.java | 9 +++++
.../pulsar/functions/utils/SinkConfigUtils.java | 9 +++++
8 files changed, 105 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 711ed88..18ffb53 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -222,7 +222,12 @@ public class CmdSinks extends CmdBase {
}
protected void validateSinkConfigs(SinkConfig sinkConfig) {
-
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+ if (sinkConfig.getTenant() == null) {
+ sinkConfig.setTenant(PUBLIC_TENANT);
+ }
+ if (sinkConfig.getNamespace() == null) {
+ sinkConfig.setNamespace(DEFAULT_NAMESPACE);
+ }
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index a200c5e..07071a7 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -215,6 +215,7 @@ public class CmdSources extends CmdBase {
@Parameters(commandDescription = "Update a Pulsar IO source connector")
protected class UpdateSource extends SourceDetailsCommand {
+
@Override
void runCmd() throws Exception {
if
(Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -226,7 +227,12 @@ public class CmdSources extends CmdBase {
}
protected void validateSourceConfigs(SourceConfig sourceConfig) {
-
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+ if (sourceConfig.getTenant() == null) {
+ sourceConfig.setTenant(PUBLIC_TENANT);
+ }
+ if (sourceConfig.getNamespace() == null) {
+ sourceConfig.setNamespace(DEFAULT_NAMESPACE);
+ }
}
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 5d8a303..98d31a4 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -165,7 +165,6 @@ public class TestCmdSinks {
@Test
public void testMissingInput() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
- sinkConfig.setInputSpecs(new HashMap<>());
sinkConfig.setInputs(null);
testCmdSinkCliMissingArgs(
TENANT,
@@ -190,7 +189,6 @@ public class TestCmdSinks {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setTopicToSerdeClassName(null);
sinkConfig.setTopicToSchemaType(null);
- sinkConfig.setInputSpecs(new HashMap<>());
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
@@ -212,7 +210,6 @@ public class TestCmdSinks {
@Test
public void testMissingTopicPattern() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
- sinkConfig.getInputSpecs().clear();
sinkConfig.setTopicsPattern(null);
testCmdSinkCliMissingArgs(
TENANT,
@@ -677,4 +674,40 @@ public class TestCmdSinks {
verify(sink).deleteSink(eq(TENANT), eq(NAMESPACE), null);
}
+
+ @Test
+ public void testUpdateSink() throws Exception {
+
+ updateSink.name = "my-sink";
+
+ updateSink.archive = "new-archive";
+
+ updateSink.processArguments();
+
+ updateSink.runCmd();
+
+ verify(sink).updateSink(eq(SinkConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSink.name)
+ .archive(updateSink.archive)
+ .build()), eq(updateSink.archive));
+
+
+ updateSink.archive = null;
+
+ updateSink.parallelism = 2;
+
+ updateSink.processArguments();
+
+ updateSink.runCmd();
+
+ verify(sink).updateSink(eq(SinkConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSink.name)
+ .parallelism(2)
+ .build()), eq(null));
+
+ }
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index ac2dab9..04fbf15 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -569,4 +569,40 @@ public class TestCmdSources {
verify(source).deleteSource(eq(TENANT), eq(NAMESPACE), null);
}
+
+ @Test
+ public void testUpdateSource() throws Exception {
+
+ updateSource.name = "my-source";
+
+ updateSource.archive = "new-archive";
+
+ updateSource.processArguments();
+
+ updateSource.runCmd();
+
+ verify(source).updateSource(eq(SourceConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSource.name)
+ .archive(updateSource.archive)
+ .build()), eq(updateSource.archive));
+
+
+ updateSource.archive = null;
+
+ updateSource.parallelism = 2;
+
+ updateSource.processArguments();
+
+ updateSource.runCmd();
+
+ verify(source).updateSource(eq(SourceConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSource.name)
+ .parallelism(2)
+ .build()), eq(null));
+
+ }
}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index dc9023a..2b8883d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -64,7 +64,7 @@ public class FunctionConfig {
/**
* A generalized way of specifying inputs
*/
- private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+ private Map<String, ConsumerConfig> inputSpecs;
private String output;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index d6dd92c..6e51cbb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -51,7 +51,7 @@ public class SinkConfig {
private Map<String, String> topicToSchemaType;
- private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+ private Map<String, ConsumerConfig> inputSpecs;
private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 6f9ab27..e245216 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -607,6 +607,15 @@ public class FunctionConfigUtils {
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
+
+ if (newConfig.getInputSpecs() == null) {
+ newConfig.setInputSpecs(new HashMap<>());
+ }
+
+ if (mergedConfig.getInputSpecs() == null) {
+ mergedConfig.setInputSpecs(new HashMap<>());
+ }
+
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d0e71c8..9159f4f 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -422,6 +422,15 @@ public class SinkConfigUtils {
if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) &&
!newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName()))
{
throw new IllegalArgumentException("Subscription Name cannot be
altered");
}
+
+ if (newConfig.getInputSpecs() == null) {
+ newConfig.setInputSpecs(new HashMap<>());
+ }
+
+ if (mergedConfig.getInputSpecs() == null) {
+ mergedConfig.setInputSpecs(new HashMap<>());
+ }
+
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,