[FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner
With this commit we can use dynamic properties to overwrite configuration values in the TaskManagerRunner. This closes #6318. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740f2fbf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740f2fbf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740f2fbf Branch: refs/heads/master Commit: 740f2fbf2e65fa988c6a577989ccd8923729be45 Parents: 2fbbf8e Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Jul 10 23:43:34 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Jul 13 18:02:06 2018 +0200 ---------------------------------------------------------------------- .../src/main/flink-bin/bin/taskmanager.sh | 10 ++++--- .../runtime/taskexecutor/TaskManagerRunner.java | 29 ++++++++++++++++---- 2 files changed, 29 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-dist/src/main/flink-bin/bin/taskmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index 771d53f..0d70f34 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" STARTSTOP=$1 +ARGS=("${@:2}") + if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then echo $USAGE exit 1 @@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters - args=("--configDir" "${FLINK_CONF_DIR}") + ARGS+=("--configDir" "${FLINK_CONF_DIR}") fi if [[ $STARTSTOP == "start-foreground" ]]; then - exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}" + exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}" else if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then # Start a single TaskManager - "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" + "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}" else # Example output from `numactl --show` on an AWS c4.8xlarge: # policy: default @@ -92,7 +94,7 @@ else read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") for NODE_ID in "${NODE_LIST[@]:1}"; do # Start a TaskManager for each NUMA node - numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}" + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}" done fi fi http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 23fd29e..91c5704 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; @@ -29,6 +29,10 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.entrypoint.ClusterConfiguration; +import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -274,11 +278,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync LOG.info("Cannot determine the maximum number of open file descriptors"); } - ParameterTool parameterTool = ParameterTool.fromArgs(args); - - final String configDir = parameterTool.get("configDir"); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(configDir); + final Configuration configuration = loadConfiguration(args); try { FileSystem.initialize(configuration); @@ -303,6 +303,23 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync } } + private static Configuration loadConfiguration(String[] args) throws FlinkParseException { + final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); + + final ClusterConfiguration clusterConfiguration; + + try { + clusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse the command line options.", e); + commandLineParser.printHelp(); + throw e; + } + + final Configuration dynamicProperties = ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties()); + return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties); + } + public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);