srkukarni closed pull request #2381: Make source and sink cli args format
consistent
URL: https://github.com/apache/incubator-pulsar/pull/2381
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 5f52313d57..1a7d1e96e1 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -268,7 +268,10 @@ void processArguments() throws Exception {
protected String DEPRECATED_userConfigString;
@Parameter(names = "--user-config", description = "User-defined config
key/values")
protected String userConfigString;
- @Parameter(names = "--retainOrdering", description = "Function
consumes and processes messages in order")
+ // for backwards compatibility purposes
+ @Parameter(names = "--retainOrdering", description = "Function
consumes and processes messages in order", hidden = true)
+ protected Boolean DEPRECATED_retainOrdering;
+ @Parameter(names = "--retain-ordering", description = "Function
consumes and processes messages in order")
protected boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The function's
parallelism factor (i.e. the number of function instances to run)")
protected Integer parallelism;
@@ -319,8 +322,9 @@ private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName))
outputSerdeClassName = DEPRECATED_outputSerdeClassName;
if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString))
customSerdeInputString = DEPRECATED_customSerdeInputString;
if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile =
DEPRECATED_fnConfigFile;
- if (DEPRECATED_processingGuarantees !=
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees =
DEPRECATED_processingGuarantees;
+ if (DEPRECATED_processingGuarantees != null) processingGuarantees
= DEPRECATED_processingGuarantees;
if (!StringUtils.isBlank(DEPRECATED_userConfigString))
userConfigString = DEPRECATED_userConfigString;
+ if (DEPRECATED_retainOrdering != null) retainOrdering =
DEPRECATED_retainOrdering;
if (DEPRECATED_windowLengthCount != null) windowLengthCount =
DEPRECATED_windowLengthCount;
if (DEPRECATED_windowLengthDurationMs != null)
windowLengthDurationMs = DEPRECATED_windowLengthDurationMs;
if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount
= DEPRECATED_slidingIntervalCount;
@@ -787,6 +791,7 @@ private void mergeArgs() {
@Override
void runCmd() throws Exception {
+ // merge deprecated args with new args
mergeArgs();
CmdFunctions.startLocalRun(convertProto2(functionConfig),
functionConfig.getParallelism(),
instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
@@ -964,6 +969,7 @@ public void mergeArgs() {
@Override
void runCmd() throws Exception {
+ // merge deprecated args with new args
mergeArgs();
if (triggerFile == null && triggerValue == null) {
throw new ParameterException("Either a trigger value or a
trigger filepath needs to be specified");
@@ -998,6 +1004,7 @@ private void mergeArgs() {
@Override
void runCmd() throws Exception {
+ // merge deprecated args with new args
mergeArgs();
if (StringUtils.isBlank(sourceFile)) {
throw new ParameterException("--source-file needs to be
specified");
@@ -1032,6 +1039,7 @@ private void mergeArgs() {
@Override
void runCmd() throws Exception {
+ // merge deprecated args with new args
mergeArgs();
if (StringUtils.isBlank(destinationFile)) {
throw new ParameterException("--destination-file needs to be
specified");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 8417874f38..5ccd9b0853 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -24,23 +24,9 @@
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -50,7 +36,6 @@
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
@@ -64,6 +49,20 @@
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -114,29 +113,62 @@ void processArguments() throws Exception {
@Parameters(commandDescription = "Run a Pulsar IO sink connector locally
(rather than deploying it to the Pulsar cluster)")
protected class LocalSinkRunner extends CreateSink {
- @Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker")
+ // for backwards compatibility purposes
+ @Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker", hidden = true)
+ protected String DEPRECATED_brokerServiceUrl;
+ @Parameter(names = "--broker-service-url", description = "The URL for
the Pulsar broker")
protected String brokerServiceUrl;
- @Parameter(names = "--clientAuthPlugin", description = "Client
authentication plugin using which function-process can connect to broker")
+ // for backwards compatibility purposes
+ @Parameter(names = "--clientAuthPlugin", description = "Client
authentication plugin using which function-process can connect to broker",
hidden = true)
+ protected String DEPRECATED_clientAuthPlugin;
+ @Parameter(names = "--client-auth-plugin", description = "Client
authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;
- @Parameter(names = "--clientAuthParams", description = "Client
authentication param")
+ // for backwards compatibility purposes
+ @Parameter(names = "--clientAuthParams", description = "Client
authentication param", hidden = true)
+ protected String DEPRECATED_clientAuthParams;
+ @Parameter(names = "--client-auth-params", description = "Client
authentication param")
protected String clientAuthParams;
- @Parameter(names = "--use_tls", description = "Use tls connection\n")
+ // for backwards compatibility purposes
+ @Parameter(names = "--use_tls", description = "Use tls connection\n",
hidden = true)
+ protected Boolean DEPRECATED_useTls;
+ @Parameter(names = "--use-tls", description = "Use tls connection\n")
protected boolean useTls;
- @Parameter(names = "--tls_allow_insecure", description = "Allow
insecure tls connection\n")
+ // for backwards compatibility purposes
+ @Parameter(names = "--tls_allow_insecure", description = "Allow
insecure tls connection\n", hidden = true)
+ protected Boolean DEPRECATED_tlsAllowInsecureConnection;
+ @Parameter(names = "--tls-allow-insecure", description = "Allow
insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;
- @Parameter(names = "--hostname_verification_enabled", description =
"Enable hostname verification")
+ // for backwards compatibility purposes
+ @Parameter(names = "--hostname_verification_enabled", description =
"Enable hostname verification", hidden = true)
+ protected Boolean DEPRECATED_tlsHostNameVerificationEnabled;
+ @Parameter(names = "--hostname-verification-enabled", description =
"Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;
- @Parameter(names = "--tls_trust_cert_path", description = "tls trust
cert file path")
+ // for backwards compatibility purposes
+ @Parameter(names = "--tls_trust_cert_path", description = "tls trust
cert file path", hidden = true)
+ protected String DEPRECATED_tlsTrustCertFilePath;
+ @Parameter(names = "--tls-trust-cert-path", description = "tls trust
cert file path")
protected String tlsTrustCertFilePath;
+ private void mergeArgs() {
+ if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl))
brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+ if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin))
clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+ if (!StringUtils.isBlank(DEPRECATED_clientAuthParams))
clientAuthParams = DEPRECATED_clientAuthParams;
+ if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
+ if (DEPRECATED_tlsAllowInsecureConnection != null)
tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
+ if (DEPRECATED_tlsHostNameVerificationEnabled != null)
tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
+ if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath))
tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+ }
+
@Override
void runCmd() throws Exception {
+ // merge deprecated args with new args
+ mergeArgs();
CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
sinkConfig.getParallelism(),
0, brokerServiceUrl, null,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
@@ -200,46 +232,91 @@ void runCmd() throws Exception {
protected String namespace;
@Parameter(names = "--name", description = "The sink's name")
protected String name;
-
@Parameter(names = { "-t", "--sink-type" }, description = "The sinks's
connector provider")
protected String sinkType;
-
@Parameter(names = "--inputs", description = "The sink's input topic
or topics (multiple topics can be specified as a comma-separated list)")
protected String inputs;
- @Parameter(names = "--topicsPattern", description = "TopicsPattern to
consume from list of topics under a namespace that match the pattern. [--input]
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a
pattern in --customSerdeInputs (supported for java fun only)")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--topicsPattern", description = "TopicsPattern to
consume from list of topics under a namespace that match the pattern. [--input]
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a
pattern in --customSerdeInputs (supported for java fun only)", hidden = true)
+ protected String DEPRECATED_topicsPattern;
+ @Parameter(names = "--topics-pattern", description = "TopicsPattern to
consume from list of topics under a namespace that match the pattern. [--input]
and [--topicsPattern] are mutually exclusive. Add SerDe class name for a
pattern in --customSerdeInputs (supported for java fun only)")
protected String topicsPattern;
- @Parameter(names = "--subsName", description = "Pulsar source
subscription name if user wants a specific subscription-name for input-topic
consumer")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--subsName", description = "Pulsar source
subscription name if user wants a specific subscription-name for input-topic
consumer", hidden = true)
+ protected String DEPRECATED_subsName;
+ @Parameter(names = "--subs-name", description = "Pulsar source
subscription name if user wants a specific subscription-name for input-topic
consumer")
protected String subsName;
- @Parameter(names = "--customSerdeInputs", description = "The map of
input topics to SerDe class names (as a JSON string)")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--customSerdeInputs", description = "The map of
input topics to SerDe class names (as a JSON string)", hidden = true)
+ protected String DEPRECATED_customSerdeInputString;
+ @Parameter(names = "--custom-serde-inputs", description = "The map of
input topics to SerDe class names (as a JSON string)")
protected String customSerdeInputString;
- @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink", hidden =
true)
+ protected FunctionConfig.ProcessingGuarantees
DEPRECATED_processingGuarantees;
+ @Parameter(names = "--processing-guarantees", description = "The
processing guarantees (aka delivery semantics) applied to the sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--retainOrdering", description = "Sink consumes
and sinks messages in order")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--retainOrdering", description = "Sink consumes
and sinks messages in order", hidden = true)
+ protected Boolean DEPRECATED_retainOrdering;
+ @Parameter(names = "--retain-ordering", description = "Sink consumes
and sinks messages in order")
protected boolean retainOrdering;
+
@Parameter(names = "--parallelism", description = "The sink's
parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@Parameter(names = {"-a", "--archive"}, description = "Path to the
archive file for the sink. It also supports url-path [http/https/file (file
protocol assumes that file already exists on worker host)] from which worker
can download the package.", listConverter = StringConverter.class)
protected String archive;
- @Parameter(names = "--className", description = "The sink's class name
if archive is file-url-path (file://)")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--className", description = "The sink's class name
if archive is file-url-path (file://)", hidden = true)
+ protected String DEPRECATED_className;
+ @Parameter(names = "--classname", description = "The sink's class name
if archive is file-url-path (file://)")
protected String className;
+ // for backwards compatibility purposes
@Parameter(names = "--sinkConfigFile", description = "The path to a
YAML config file specifying the "
+ + "sink's configuration", hidden = true)
+ protected String DEPRECATED_sinkConfigFile;
+ @Parameter(names = "--sink-config-file", description = "The path to a
YAML config file specifying the "
+ "sink's configuration")
protected String sinkConfigFile;
+
@Parameter(names = "--cpu", description = "The CPU (in cores) that
needs to be allocated per sink instance (applicable only to Docker runtime)")
protected Double cpu;
@Parameter(names = "--ram", description = "The RAM (in bytes) that
need to be allocated per sink instance (applicable only to the process and
Docker runtimes)")
protected Long ram;
@Parameter(names = "--disk", description = "The disk (in bytes) that
need to be allocated per sink instance (applicable only to Docker runtime)")
protected Long disk;
- @Parameter(names = "--sinkConfig", description = "User defined configs
key/values")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--sinkConfig", description = "User defined configs
key/values", hidden = true)
+ protected String DEPRECATED_sinkConfigString;
+ @Parameter(names = "--sink-config", description = "User defined
configs key/values")
protected String sinkConfigString;
protected SinkConfig sinkConfig;
+ private void mergeArgs() {
+ if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName =
DEPRECATED_subsName;
+ if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern
= DEPRECATED_topicsPattern;
+ if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString))
customSerdeInputString = DEPRECATED_customSerdeInputString;
+ if (DEPRECATED_processingGuarantees != null) processingGuarantees
= DEPRECATED_processingGuarantees;
+ if (DEPRECATED_retainOrdering != null) retainOrdering =
DEPRECATED_retainOrdering;
+ if (!StringUtils.isBlank(DEPRECATED_className)) className =
DEPRECATED_className;
+ if (!StringUtils.isBlank(DEPRECATED_sinkConfigFile))
sinkConfigFile = DEPRECATED_sinkConfigFile;
+ if (!StringUtils.isBlank(DEPRECATED_sinkConfigString))
sinkConfigString = DEPRECATED_sinkConfigString;
+ }
+
@Override
void processArguments() throws Exception {
super.processArguments();
+ // merge deprecated args with new args
+ mergeArgs();
if (null != sinkConfigFile) {
this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile,
SinkConfig.class);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index d050ab515c..8f72d0a69c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -24,16 +24,6 @@
import com.beust.jcommander.converters.StringConverter;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -61,8 +51,10 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
@@ -115,38 +107,72 @@ void processArguments() throws Exception {
@Parameters(commandDescription = "Run a Pulsar IO source connector locally
(rather than deploying it to the Pulsar cluster)")
protected class LocalSourceRunner extends CreateSource {
- @Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker")
+ // for backwards compatibility purposes
+ @Parameter(names = "--brokerServiceUrl", description = "The URL for
the Pulsar broker", hidden = true)
+ protected String DEPRECATED_brokerServiceUrl;
+ @Parameter(names = "--broker-service-url", description = "The URL for
the Pulsar broker")
protected String brokerServiceUrl;
- @Parameter(names = "--clientAuthPlugin", description = "Client
authentication plugin using which function-process can connect to broker")
+ // for backwards compatibility purposes
+ @Parameter(names = "--clientAuthPlugin", description = "Client
authentication plugin using which function-process can connect to broker",
hidden = true)
+ protected String DEPRECATED_clientAuthPlugin;
+ @Parameter(names = "--client-auth-plugin", description = "Client
authentication plugin using which function-process can connect to broker")
protected String clientAuthPlugin;
- @Parameter(names = "--clientAuthParams", description = "Client
authentication param")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--clientAuthParams", description = "Client
authentication param", hidden = true)
+ protected String DEPRECATED_clientAuthParams;
+ @Parameter(names = "--client-auth-params", description = "Client
authentication param")
protected String clientAuthParams;
- @Parameter(names = "--use_tls", description = "Use tls connection\n")
+ // for backwards compatibility purposes
+ @Parameter(names = "--use_tls", description = "Use tls connection\n",
hidden = true)
+ protected Boolean DEPRECATED_useTls;
+ @Parameter(names = "--use-tls", description = "Use tls connection\n")
protected boolean useTls;
- @Parameter(names = "--tls_allow_insecure", description = "Allow
insecure tls connection\n")
+ // for backwards compatibility purposes
+ @Parameter(names = "--tls_allow_insecure", description = "Allow
insecure tls connection\n", hidden = true)
+ protected Boolean DEPRECATED_tlsAllowInsecureConnection;
+ @Parameter(names = "--tls-allow-insecure", description = "Allow
insecure tls connection\n")
protected boolean tlsAllowInsecureConnection;
- @Parameter(names = "--hostname_verification_enabled", description =
"Enable hostname verification")
+ // for backwards compatibility purposes
+ @Parameter(names = "--hostname_verification_enabled", description =
"Enable hostname verification", hidden = true)
+ protected Boolean DEPRECATED_tlsHostNameVerificationEnabled;
+ @Parameter(names = "--hostname-verification-enabled", description =
"Enable hostname verification")
protected boolean tlsHostNameVerificationEnabled;
- @Parameter(names = "--tls_trust_cert_path", description = "tls trust
cert file path")
+ // for backwards compatibility purposes
+ @Parameter(names = "--tls_trust_cert_path", description = "tls trust
cert file path", hidden = true)
+ protected String DEPRECATED_tlsTrustCertFilePath;
+ @Parameter(names = "--tls-trust-cert-path", description = "tls trust
cert file path")
protected String tlsTrustCertFilePath;
+ private void mergeArgs() {
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_brokerServiceUrl))
brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthPlugin))
clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthParams))
clientAuthParams = DEPRECATED_clientAuthParams;
+ if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
+ if (DEPRECATED_tlsAllowInsecureConnection != null)
tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
+ if (DEPRECATED_tlsHostNameVerificationEnabled != null)
tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath))
tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+ }
+
@Override
void runCmd() throws Exception {
-
CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
sourceConfig.getParallelism(),
- 0, brokerServiceUrl, null,
-
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-
.tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- sourceConfig.getArchive(), admin);
- }
+ // merge deprecated args with new args
+ mergeArgs();
+ CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
sourceConfig.getParallelism(),
+ 0, brokerServiceUrl, null,
+
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
+
.clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+ .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+
.tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+ .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+ sourceConfig.getArchive(), admin);
+ }
@Override
protected String validateSourceType(String sourceType) throws
IOException {
@@ -205,36 +231,73 @@ void runCmd() throws Exception {
@Parameter(names = { "-t", "--source-type" }, description = "The
source's connector provider")
protected String sourceType;
- @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the Source")
+ // for backwards compatibility purposes
+ @Parameter(names = "--processingGuarantees", description = "The
processing guarantees (aka delivery semantics) applied to the source", hidden =
true)
+ protected FunctionConfig.ProcessingGuarantees
DEPRECATED_processingGuarantees;
+ @Parameter(names = "--processing-guarantees", description = "The
processing guarantees (aka delivery semantics) applied to the source")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--destinationTopicName", description = "The Pulsar
topic to which data is sent")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--destinationTopicName", description = "The Pulsar
topic to which data is sent", hidden = true)
+ protected String DEPRECATED_destinationTopicName;
+ @Parameter(names = "--destination-topic-name", description = "The
Pulsar topic to which data is sent")
protected String destinationTopicName;
- @Parameter(names = "--deserializationClassName", description = "The
SerDe classname for the source")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--deserializationClassName", description = "The
SerDe classname for the source", hidden = true)
+ protected String DEPRECATED_deserializationClassName;
+ @Parameter(names = "--deserialization-classname", description = "The
SerDe classname for the source")
protected String deserializationClassName;
+
@Parameter(names = "--parallelism", description = "The source's
parallelism factor (i.e. the number of source instances to run)")
protected Integer parallelism;
@Parameter(names = { "-a", "--archive" },
description = "The path to the NAR archive for the Source. It
also supports url-path [http/https/file (file protocol assumes that file
already exists on worker host)] from which worker can download the package.",
listConverter = StringConverter.class)
protected String archive;
- @Parameter(names = "--className", description = "The source's class
name if archive is file-url-path (file://)")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--className", description = "The source's class
name if archive is file-url-path (file://)", hidden = true)
+ protected String DEPRECATED_className;
+ @Parameter(names = "--classname", description = "The source's class
name if archive is file-url-path (file://)")
protected String className;
+
+ // for backwards compatibility purposes
@Parameter(names = "--sourceConfigFile", description = "The path to a
YAML config file specifying the "
+ + "source's configuration", hidden = true)
+ protected String DEPRECATED_sourceConfigFile;
+ @Parameter(names = "--source-config-file", description = "The path to
a YAML config file specifying the "
+ "source's configuration")
protected String sourceConfigFile;
+
@Parameter(names = "--cpu", description = "The CPU (in cores) that
needs to be allocated per source instance (applicable only to Docker runtime)")
protected Double cpu;
@Parameter(names = "--ram", description = "The RAM (in bytes) that
need to be allocated per source instance (applicable only to the process and
Docker runtimes)")
protected Long ram;
@Parameter(names = "--disk", description = "The disk (in bytes) that
need to be allocated per source instance (applicable only to Docker runtime)")
protected Long disk;
- @Parameter(names = "--sourceConfig", description = "Source config
key/values")
+
+ // for backwards compatibility purposes
+ @Parameter(names = "--sourceConfig", description = "Source config
key/values", hidden = true)
+ protected String DEPRECATED_sourceConfigString;
+ @Parameter(names = "--source-config", description = "Source config
key/values")
protected String sourceConfigString;
protected SourceConfig sourceConfig;
+ private void mergeArgs() {
+ if (DEPRECATED_processingGuarantees != null) processingGuarantees
= DEPRECATED_processingGuarantees;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_destinationTopicName))
destinationTopicName = DEPRECATED_destinationTopicName;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_deserializationClassName))
deserializationClassName = DEPRECATED_deserializationClassName;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_className)) className
= DEPRECATED_className;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigFile))
sourceConfigFile = DEPRECATED_sourceConfigFile;
+ if
(!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigString))
sourceConfigString = DEPRECATED_sourceConfigString;
+ }
+
@Override
void processArguments() throws Exception {
super.processArguments();
+ // merge deprecated args with new args
+ mergeArgs();
if (null != sourceConfigFile) {
this.sourceConfig = CmdUtils.loadConfig(sourceConfigFile,
SourceConfig.class);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services