srkukarni closed pull request #2381: Make source and sink cli args format consistent URL: https://github.com/apache/incubator-pulsar/pull/2381
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 5f52313d57..1a7d1e96e1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -268,7 +268,10 @@ void processArguments() throws Exception { protected String DEPRECATED_userConfigString; @Parameter(names = "--user-config", description = "User-defined config key/values") protected String userConfigString; - @Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order") + // for backwards compatibility purposes + @Parameter(names = "--retainOrdering", description = "Function consumes and processes messages in order", hidden = true) + protected Boolean DEPRECATED_retainOrdering; + @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order") protected boolean retainOrdering; @Parameter(names = "--parallelism", description = "The function's parallelism factor (i.e. the number of function instances to run)") protected Integer parallelism; @@ -319,8 +322,9 @@ private void mergeArgs() { if (!StringUtils.isBlank(DEPRECATED_outputSerdeClassName)) outputSerdeClassName = DEPRECATED_outputSerdeClassName; if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString; if (!StringUtils.isBlank(DEPRECATED_fnConfigFile)) fnConfigFile = DEPRECATED_fnConfigFile; - if (DEPRECATED_processingGuarantees != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) processingGuarantees = DEPRECATED_processingGuarantees; + if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; if (!StringUtils.isBlank(DEPRECATED_userConfigString)) userConfigString = DEPRECATED_userConfigString; + if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering; if (DEPRECATED_windowLengthCount != null) windowLengthCount = DEPRECATED_windowLengthCount; if (DEPRECATED_windowLengthDurationMs != null) windowLengthDurationMs = DEPRECATED_windowLengthDurationMs; if (DEPRECATED_slidingIntervalCount != null) slidingIntervalCount = DEPRECATED_slidingIntervalCount; @@ -787,6 +791,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { + // merge deprecated args with new args mergeArgs(); CmdFunctions.startLocalRun(convertProto2(functionConfig), functionConfig.getParallelism(), instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl, @@ -964,6 +969,7 @@ public void mergeArgs() { @Override void runCmd() throws Exception { + // merge deprecated args with new args mergeArgs(); if (triggerFile == null && triggerValue == null) { throw new ParameterException("Either a trigger value or a trigger filepath needs to be specified"); @@ -998,6 +1004,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { + // merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(sourceFile)) { throw new ParameterException("--source-file needs to be specified"); @@ -1032,6 +1039,7 @@ private void mergeArgs() { @Override void runCmd() throws Exception { + // merge deprecated args with new args mergeArgs(); if (StringUtils.isBlank(destinationFile)) { throw new ParameterException("--destination-file needs to be specified"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java index 8417874f38..5ccd9b0853 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java @@ -24,23 +24,9 @@ import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.apache.pulsar.admin.cli.utils.CmdUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -50,7 +36,6 @@ import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.instance.AuthenticationConfig; -import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Resources; import org.apache.pulsar.functions.proto.Function.SinkSpec; @@ -64,6 +49,20 @@ import org.apache.pulsar.functions.utils.io.Connectors; import org.apache.pulsar.functions.utils.validation.ConfigValidation; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; @@ -114,29 +113,62 @@ void processArguments() throws Exception { @Parameters(commandDescription = "Run a Pulsar IO sink connector locally (rather than deploying it to the Pulsar cluster)") protected class LocalSinkRunner extends CreateSink { - @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") + // for backwards compatibility purposes + @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true) + protected String DEPRECATED_brokerServiceUrl; + @Parameter(names = "--broker-service-url", description = "The URL for the Pulsar broker") protected String brokerServiceUrl; - @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker") + // for backwards compatibility purposes + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true) + protected String DEPRECATED_clientAuthPlugin; + @Parameter(names = "--client-auth-plugin", description = "Client authentication plugin using which function-process can connect to broker") protected String clientAuthPlugin; - @Parameter(names = "--clientAuthParams", description = "Client authentication param") + // for backwards compatibility purposes + @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true) + protected String DEPRECATED_clientAuthParams; + @Parameter(names = "--client-auth-params", description = "Client authentication param") protected String clientAuthParams; - @Parameter(names = "--use_tls", description = "Use tls connection\n") + // for backwards compatibility purposes + @Parameter(names = "--use_tls", description = "Use tls connection\n", hidden = true) + protected Boolean DEPRECATED_useTls; + @Parameter(names = "--use-tls", description = "Use tls connection\n") protected boolean useTls; - @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + // for backwards compatibility purposes + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n", hidden = true) + protected Boolean DEPRECATED_tlsAllowInsecureConnection; + @Parameter(names = "--tls-allow-insecure", description = "Allow insecure tls connection\n") protected boolean tlsAllowInsecureConnection; - @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + // for backwards compatibility purposes + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification", hidden = true) + protected Boolean DEPRECATED_tlsHostNameVerificationEnabled; + @Parameter(names = "--hostname-verification-enabled", description = "Enable hostname verification") protected boolean tlsHostNameVerificationEnabled; - @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + // for backwards compatibility purposes + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path", hidden = true) + protected String DEPRECATED_tlsTrustCertFilePath; + @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path") protected String tlsTrustCertFilePath; + private void mergeArgs() { + if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; + if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; + if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams; + if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls; + if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection; + if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled; + if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath; + } + @Override void runCmd() throws Exception { + // merge deprecated args with new args + mergeArgs(); CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig), sinkConfig.getParallelism(), 0, brokerServiceUrl, null, AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) @@ -200,46 +232,91 @@ void runCmd() throws Exception { protected String namespace; @Parameter(names = "--name", description = "The sink's name") protected String name; - @Parameter(names = { "-t", "--sink-type" }, description = "The sinks's connector provider") protected String sinkType; - @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)") protected String inputs; - @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") + + // for backwards compatibility purposes + @Parameter(names = "--topicsPattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)", hidden = true) + protected String DEPRECATED_topicsPattern; + @Parameter(names = "--topics-pattern", description = "TopicsPattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in --customSerdeInputs (supported for java fun only)") protected String topicsPattern; - @Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") + + // for backwards compatibility purposes + @Parameter(names = "--subsName", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer", hidden = true) + protected String DEPRECATED_subsName; + @Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer") protected String subsName; - @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)") + + // for backwards compatibility purposes + @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)", hidden = true) + protected String DEPRECATED_customSerdeInputString; + @Parameter(names = "--custom-serde-inputs", description = "The map of input topics to SerDe class names (as a JSON string)") protected String customSerdeInputString; - @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink") + + // for backwards compatibility purposes + @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink", hidden = true) + protected FunctionConfig.ProcessingGuarantees DEPRECATED_processingGuarantees; + @Parameter(names = "--processing-guarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink") protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order") + + // for backwards compatibility purposes + @Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order", hidden = true) + protected Boolean DEPRECATED_retainOrdering; + @Parameter(names = "--retain-ordering", description = "Sink consumes and sinks messages in order") protected boolean retainOrdering; + @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)") protected Integer parallelism; @Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String archive; - @Parameter(names = "--className", description = "The sink's class name if archive is file-url-path (file://)") + + // for backwards compatibility purposes + @Parameter(names = "--className", description = "The sink's class name if archive is file-url-path (file://)", hidden = true) + protected String DEPRECATED_className; + @Parameter(names = "--classname", description = "The sink's class name if archive is file-url-path (file://)") protected String className; + // for backwards compatibility purposes @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the " + + "sink's configuration", hidden = true) + protected String DEPRECATED_sinkConfigFile; + @Parameter(names = "--sink-config-file", description = "The path to a YAML config file specifying the " + "sink's configuration") protected String sinkConfigFile; + @Parameter(names = "--cpu", description = "The CPU (in cores) that needs to be allocated per sink instance (applicable only to Docker runtime)") protected Double cpu; @Parameter(names = "--ram", description = "The RAM (in bytes) that need to be allocated per sink instance (applicable only to the process and Docker runtimes)") protected Long ram; @Parameter(names = "--disk", description = "The disk (in bytes) that need to be allocated per sink instance (applicable only to Docker runtime)") protected Long disk; - @Parameter(names = "--sinkConfig", description = "User defined configs key/values") + + // for backwards compatibility purposes + @Parameter(names = "--sinkConfig", description = "User defined configs key/values", hidden = true) + protected String DEPRECATED_sinkConfigString; + @Parameter(names = "--sink-config", description = "User defined configs key/values") protected String sinkConfigString; protected SinkConfig sinkConfig; + private void mergeArgs() { + if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName; + if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern; + if (!StringUtils.isBlank(DEPRECATED_customSerdeInputString)) customSerdeInputString = DEPRECATED_customSerdeInputString; + if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; + if (DEPRECATED_retainOrdering != null) retainOrdering = DEPRECATED_retainOrdering; + if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className; + if (!StringUtils.isBlank(DEPRECATED_sinkConfigFile)) sinkConfigFile = DEPRECATED_sinkConfigFile; + if (!StringUtils.isBlank(DEPRECATED_sinkConfigString)) sinkConfigString = DEPRECATED_sinkConfigString; + } + @Override void processArguments() throws Exception { super.processArguments(); + // merge deprecated args with new args + mergeArgs(); if (null != sinkConfigFile) { this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index d050ab515c..8f72d0a69c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -24,16 +24,6 @@ import com.beust.jcommander.converters.StringConverter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.lang.reflect.Type; -import java.nio.file.Paths; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -61,8 +51,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Type; +import java.nio.file.Paths; import java.util.Collections; import java.util.Map; +import java.util.Set; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; @@ -115,38 +107,72 @@ void processArguments() throws Exception { @Parameters(commandDescription = "Run a Pulsar IO source connector locally (rather than deploying it to the Pulsar cluster)") protected class LocalSourceRunner extends CreateSource { - @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker") + // for backwards compatibility purposes + @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true) + protected String DEPRECATED_brokerServiceUrl; + @Parameter(names = "--broker-service-url", description = "The URL for the Pulsar broker") protected String brokerServiceUrl; - @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker") + // for backwards compatibility purposes + @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true) + protected String DEPRECATED_clientAuthPlugin; + @Parameter(names = "--client-auth-plugin", description = "Client authentication plugin using which function-process can connect to broker") protected String clientAuthPlugin; - @Parameter(names = "--clientAuthParams", description = "Client authentication param") + + // for backwards compatibility purposes + @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true) + protected String DEPRECATED_clientAuthParams; + @Parameter(names = "--client-auth-params", description = "Client authentication param") protected String clientAuthParams; - @Parameter(names = "--use_tls", description = "Use tls connection\n") + // for backwards compatibility purposes + @Parameter(names = "--use_tls", description = "Use tls connection\n", hidden = true) + protected Boolean DEPRECATED_useTls; + @Parameter(names = "--use-tls", description = "Use tls connection\n") protected boolean useTls; - @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n") + // for backwards compatibility purposes + @Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n", hidden = true) + protected Boolean DEPRECATED_tlsAllowInsecureConnection; + @Parameter(names = "--tls-allow-insecure", description = "Allow insecure tls connection\n") protected boolean tlsAllowInsecureConnection; - @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification") + // for backwards compatibility purposes + @Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification", hidden = true) + protected Boolean DEPRECATED_tlsHostNameVerificationEnabled; + @Parameter(names = "--hostname-verification-enabled", description = "Enable hostname verification") protected boolean tlsHostNameVerificationEnabled; - @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path") + // for backwards compatibility purposes + @Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path", hidden = true) + protected String DEPRECATED_tlsTrustCertFilePath; + @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path") protected String tlsTrustCertFilePath; + private void mergeArgs() { + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams; + if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls; + if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection; + if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath; + } + @Override void runCmd() throws Exception { - CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(), - 0, brokerServiceUrl, null, - AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) - .clientAuthenticationParameters(clientAuthParams).useTls(useTls) - .tlsAllowInsecureConnection(tlsAllowInsecureConnection) - .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) - .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), - sourceConfig.getArchive(), admin); - } + // merge deprecated args with new args + mergeArgs(); + CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig), sourceConfig.getParallelism(), + 0, brokerServiceUrl, null, + AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin) + .clientAuthenticationParameters(clientAuthParams).useTls(useTls) + .tlsAllowInsecureConnection(tlsAllowInsecureConnection) + .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled) + .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(), + sourceConfig.getArchive(), admin); + } @Override protected String validateSourceType(String sourceType) throws IOException { @@ -205,36 +231,73 @@ void runCmd() throws Exception { @Parameter(names = { "-t", "--source-type" }, description = "The source's connector provider") protected String sourceType; - @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source") + // for backwards compatibility purposes + @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the source", hidden = true) + protected FunctionConfig.ProcessingGuarantees DEPRECATED_processingGuarantees; + @Parameter(names = "--processing-guarantees", description = "The processing guarantees (aka delivery semantics) applied to the source") protected FunctionConfig.ProcessingGuarantees processingGuarantees; - @Parameter(names = "--destinationTopicName", description = "The Pulsar topic to which data is sent") + + // for backwards compatibility purposes + @Parameter(names = "--destinationTopicName", description = "The Pulsar topic to which data is sent", hidden = true) + protected String DEPRECATED_destinationTopicName; + @Parameter(names = "--destination-topic-name", description = "The Pulsar topic to which data is sent") protected String destinationTopicName; - @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source") + + // for backwards compatibility purposes + @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source", hidden = true) + protected String DEPRECATED_deserializationClassName; + @Parameter(names = "--deserialization-classname", description = "The SerDe classname for the source") protected String deserializationClassName; + @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)") protected Integer parallelism; @Parameter(names = { "-a", "--archive" }, description = "The path to the NAR archive for the Source. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class) protected String archive; - @Parameter(names = "--className", description = "The source's class name if archive is file-url-path (file://)") + + // for backwards compatibility purposes + @Parameter(names = "--className", description = "The source's class name if archive is file-url-path (file://)", hidden = true) + protected String DEPRECATED_className; + @Parameter(names = "--classname", description = "The source's class name if archive is file-url-path (file://)") protected String className; + + // for backwards compatibility purposes @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the " + + "source's configuration", hidden = true) + protected String DEPRECATED_sourceConfigFile; + @Parameter(names = "--source-config-file", description = "The path to a YAML config file specifying the " + "source's configuration") protected String sourceConfigFile; + @Parameter(names = "--cpu", description = "The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime)") protected Double cpu; @Parameter(names = "--ram", description = "The RAM (in bytes) that need to be allocated per source instance (applicable only to the process and Docker runtimes)") protected Long ram; @Parameter(names = "--disk", description = "The disk (in bytes) that need to be allocated per source instance (applicable only to Docker runtime)") protected Long disk; - @Parameter(names = "--sourceConfig", description = "Source config key/values") + + // for backwards compatibility purposes + @Parameter(names = "--sourceConfig", description = "Source config key/values", hidden = true) + protected String DEPRECATED_sourceConfigString; + @Parameter(names = "--source-config", description = "Source config key/values") protected String sourceConfigString; protected SourceConfig sourceConfig; + private void mergeArgs() { + if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile; + if (!org.apache.commons.lang.StringUtils.isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString; + } + @Override void processArguments() throws Exception { super.processArguments(); + // merge deprecated args with new args + mergeArgs(); if (null != sourceConfigFile) { this.sourceConfig = CmdUtils.loadConfig(sourceConfigFile, SourceConfig.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services