MOBIN-F commented on code in PR #3918:
URL: https://github.com/apache/flink-cdc/pull/3918#discussion_r2020827483
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java:
##########
@@ -132,103 +106,6 @@ static CliExecutor createExecutor(CommandLine
commandLine) throws Exception {
flinkHome);
}
- private static void overrideFlinkConfiguration(
- Configuration flinkConfig, CommandLine commandLine) {
-
- String target =
- commandLine.hasOption(USE_MINI_CLUSTER)
- ? LOCAL.getName()
- : commandLine.getOptionValue(TARGET, REMOTE.getName());
- flinkConfig.set(
-
ConfigOptions.key(DeploymentOptions.TARGET.key()).stringType().defaultValue(target),
- target);
-
- Properties properties =
commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
- LOG.info("Dynamic flink config items found: {}", properties);
- for (String key : properties.stringPropertyNames()) {
- String value = properties.getProperty(key);
- if (StringUtils.isNullOrWhitespaceOnly(key)
- || StringUtils.isNullOrWhitespaceOnly(value)) {
- throw new IllegalArgumentException(
- String.format(
- "null or white space argument for key or
value: %s=%s",
- key, value));
- }
- ConfigOption<String> configOption =
-
ConfigOptions.key(key.trim()).stringType().defaultValue(value.trim());
- flinkConfig.set(configOption, value.trim());
- }
- }
-
- private static SavepointRestoreSettings createSavepointRestoreSettings(
- CommandLine commandLine) {
- if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
- String savepointPath =
commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
- boolean allowNonRestoredState =
-
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
- final Object restoreMode;
- if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
- restoreMode =
-
org.apache.flink.configuration.ConfigurationUtils.convertValue(
-
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
- ConfigurationUtils.getClaimModeClass());
- } else {
- restoreMode =
SavepointConfigOptions.RESTORE_MODE.defaultValue();
- }
- // allowNonRestoredState is always false because all operators are
predefined.
-
- return (SavepointRestoreSettings)
- Arrays.stream(SavepointRestoreSettings.class.getMethods())
- .filter(
- method ->
- method.getName().equals("forPath")
- &&
method.getParameterCount() == 3)
- .findFirst()
- .map(
- method -> {
- try {
- return method.invoke(
- null,
- savepointPath,
- allowNonRestoredState,
- restoreMode);
- } catch (IllegalAccessException
- | InvocationTargetException e)
{
- throw new RuntimeException(
- "Failed to invoke
SavepointRestoreSettings#forPath nethod.",
- e);
- }
- })
- .orElseThrow(
- () ->
- new RuntimeException(
- "Failed to resolve
SavepointRestoreSettings#forPath method."));
- } else {
- return SavepointRestoreSettings.none();
- }
- }
-
- private static Path getFlinkHome(CommandLine commandLine) {
- // Check command line arguments first
- String flinkHomeFromArgs =
commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME);
- if (flinkHomeFromArgs != null) {
- LOG.debug("Flink home is loaded by command-line argument: {}",
flinkHomeFromArgs);
- return new Path(flinkHomeFromArgs);
- }
-
- // Fallback to environment variable
- String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR);
- if (flinkHomeFromEnvVar != null) {
- LOG.debug("Flink home is loaded by environment variable: {}",
flinkHomeFromEnvVar);
- return new Path(flinkHomeFromEnvVar);
- }
-
- throw new IllegalArgumentException(
- "Cannot find Flink home from either command line arguments
\"--flink-home\" "
- + "or the environment variable \"FLINK_HOME\". "
- + "Please make sure Flink home is properly set. ");
- }
-
Review Comment:
Move the methods for loading and merging flink config to the
FlinkEnvironmentUtils class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]