[FLINK-9821] Forward dynamic properties to configuration in TaskManagerRunner

With this commit we can use dynamic properties to overwrite configuration 
values in the
TaskManagerRunner.

This closes #6318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740f2fbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740f2fbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740f2fbf

Branch: refs/heads/master
Commit: 740f2fbf2e65fa988c6a577989ccd8923729be45
Parents: 2fbbf8e
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Jul 10 23:43:34 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Jul 13 18:02:06 2018 +0200

----------------------------------------------------------------------
 .../src/main/flink-bin/bin/taskmanager.sh       | 10 ++++---
 .../runtime/taskexecutor/TaskManagerRunner.java | 29 ++++++++++++++++----
 2 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 771d53f..0d70f34 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -22,6 +22,8 @@ USAGE="Usage: taskmanager.sh 
(start|start-foreground|stop|stop-all)"
 
 STARTSTOP=$1
 
+ARGS=("${@:2}")
+
 if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ 
$STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
   echo $USAGE
   exit 1
@@ -72,15 +74,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_TM}"
 
     # Startup parameters
-    args=("--configDir" "${FLINK_CONF_DIR}")
+    ARGS+=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
-    exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${args[@]}"
+    exec "${FLINK_BIN_DIR}"/flink-console.sh $TYPE "${ARGS[@]}"
 else
     if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
         # Start a single TaskManager
-        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
     else
         # Example output from `numactl --show` on an AWS c4.8xlarge:
         # policy: default
@@ -92,7 +94,7 @@ else
         read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
         for NODE_ID in "${NODE_LIST[@]:1}"; do
             # Start a TaskManager for each NUMA node
-            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- 
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${args[@]}"
+            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- 
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $TYPE "${ARGS[@]}"
         done
     fi
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/740f2fbf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 23fd29e..91c5704 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -29,6 +29,10 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterConfiguration;
+import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -274,11 +278,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                        LOG.info("Cannot determine the maximum number of open 
file descriptors");
                }
 
-               ParameterTool parameterTool = ParameterTool.fromArgs(args);
-
-               final String configDir = parameterTool.get("configDir");
-
-               final Configuration configuration = 
GlobalConfiguration.loadConfiguration(configDir);
+               final Configuration configuration = loadConfiguration(args);
 
                try {
                        FileSystem.initialize(configuration);
@@ -303,6 +303,23 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                }
        }
 
+       private static Configuration loadConfiguration(String[] args) throws 
FlinkParseException {
+               final CommandLineParser<ClusterConfiguration> commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
+
+               final ClusterConfiguration clusterConfiguration;
+
+               try {
+                       clusterConfiguration = commandLineParser.parse(args);
+               } catch (FlinkParseException e) {
+                       LOG.error("Could not parse the command line options.", 
e);
+                       commandLineParser.printHelp();
+                       throw e;
+               }
+
+               final Configuration dynamicProperties = 
ConfigurationUtils.createConfiguration(clusterConfiguration.getDynamicProperties());
+               return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), 
dynamicProperties);
+       }
+
        public static void runTaskManager(Configuration configuration, 
ResourceID resourceId) throws Exception {
                final TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, resourceId);
 

Reply via email to