[FLINK-8475][config][docs] Integrate TM options This closes #5471.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57ec03ed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57ec03ed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57ec03ed Branch: refs/heads/master Commit: 57ec03ed3dc6ce3fbb563d3274b032945ba53510 Parents: 4bbe2dc Author: zentol <ches...@apache.org> Authored: Mon Jan 22 18:15:34 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Feb 14 12:28:47 2018 +0100 ---------------------------------------------------------------------- .../generated/task_manager_configuration.html | 161 +++++++++++++++++ docs/ops/config.md | 54 +----- .../flink/configuration/ConfigConstants.java | 51 ++++++ .../flink/configuration/TaskManagerOptions.java | 174 ++++++++++++++++--- .../clusterframework/BootstrapTools.java | 3 +- .../runtime/io/network/netty/NettyConfig.java | 5 +- .../taskexecutor/TaskManagerConfiguration.java | 23 +-- .../TaskManagerServicesConfiguration.java | 5 +- .../minicluster/LocalFlinkMiniCluster.scala | 6 +- .../flink/runtime/taskmanager/TaskManager.scala | 6 +- .../TaskManagerRegistrationTest.java | 9 +- .../taskmanager/TaskManagerStartupTest.java | 2 +- .../runtime/taskmanager/TaskManagerTest.java | 4 +- .../jobmanager/JobManagerFailsITCase.scala | 6 +- .../apache/flink/yarn/YarnResourceManager.java | 4 +- 15 files changed, 395 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/docs/_includes/generated/task_manager_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html new file mode 100644 index 0000000..080010c --- /dev/null +++ b/docs/_includes/generated/task_manager_configuration.html @@ -0,0 +1,161 @@ +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>task.cancellation.interval</h5></td> + <td>30000</td> + <td>Time interval between two successive task cancellation attempts in milliseconds.</td> + </tr> + <tr> + <td><h5>task.cancellation.timeout</h5></td> + <td>180000</td> + <td>Timeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog.</td> + </tr> + <tr> + <td><h5>task.checkpoint.alignment.max-size</h5></td> + <td>-1</td> + <td>The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit.</td> + </tr> + <tr> + <td><h5>taskmanager.data.port</h5></td> + <td>0</td> + <td>The task managerâs port used for data exchange operations.</td> + </tr> + <tr> + <td><h5>taskmanager.data.ssl.enabled</h5></td> + <td>true</td> + <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true</td> + </tr> + <tr> + <td><h5>taskmanager.debug.memory.logIntervalMs</h5></td> + <td>5000</td> + <td>The interval (in ms) for the log thread to log the current memory usage.</td> + </tr> + <tr> + <td><h5>taskmanager.debug.memory.startLogThread</h5></td> + <td>false</td> + <td>Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.</td> + </tr> + <tr> + <td><h5>taskmanager.exit-on-fatal-akka-error</h5></td> + <td>false</td> + <td>Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.</td> + </tr> + <tr> + <td><h5>taskmanager.heap.mb</h5></td> + <td>1024</td> + <td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td> + </tr> + <tr> + <td><h5>taskmanager.host</h5></td> + <td>(none)</td> + <td>The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.</td> + </tr> + <tr> + <td><h5>taskmanager.initial-registration-pause</h5></td> + <td>"500 ms"</td> + <td>The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause.</td> + </tr> + <tr> + <td><h5>taskmanager.jvm-exit-on-oom</h5></td> + <td>false</td> + <td>Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.</td> + </tr> + <tr> + <td><h5>taskmanager.max-registration-pause</h5></td> + <td>"30 s"</td> + <td>The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d).</td> + </tr> + <tr> + <td><h5>taskmanager.maxRegistrationDuration</h5></td> + <td>"Inf"</td> + <td>Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td> + </tr> + <tr> + <td><h5>taskmanager.memory.fraction</h5></td> + <td>0.7</td> + <td>The relative amount of memory (after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. This parameter is only evaluated, if taskmanager.memory.size is not set.</td> + </tr> + <tr> + <td><h5>taskmanager.memory.off-heap</h5></td> + <td>false</td> + <td>Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager as well as the network buffers.</td> + </tr> + <tr> + <td><h5>taskmanager.memory.preallocate</h5></td> + <td>false</td> + <td>Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.</td> + </tr> + <tr> + <td><h5>taskmanager.memory.segment-size</h5></td> + <td>32768</td> + <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td> + </tr> + <tr> + <td><h5>taskmanager.memory.size</h5></td> + <td>-1</td> + <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td> + </tr> + <tr> + <td><h5>taskmanager.network.detailed-metrics</h5></td> + <td>false</td> + <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td> + <td>2</td> + <td>Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td> + <td>8</td> + <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.fraction</h5></td> + <td>0.1</td> + <td>Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.max</h5></td> + <td>1073741824</td> + <td>Maximum memory size for network buffers (in bytes).</td> + </tr> + <tr> + <td><h5>taskmanager.network.memory.min</h5></td> + <td>67108864</td> + <td>Minimum memory size for network buffers (in bytes).</td> + </tr> + <tr> + <td><h5>taskmanager.network.request-backoff.initial</h5></td> + <td>100</td> + <td>Minimum backoff for partition requests of input channels.</td> + </tr> + <tr> + <td><h5>taskmanager.network.request-backoff.max</h5></td> + <td>10000</td> + <td>Maximum backoff for partition requests of input channels.</td> + </tr> + <tr> + <td><h5>taskmanager.numberOfTaskSlots</h5></td> + <td>1</td> + <td>The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).</td> + </tr> + <tr> + <td><h5>taskmanager.refused-registration-pause</h5></td> + <td>"10 s"</td> + <td>The pause after a registration has been refused by the job manager before retrying to connect.</td> + </tr> + <tr> + <td><h5>taskmanager.rpc.port</h5></td> + <td>"0"</td> + <td> The task managerâs IPC port. Accepts a list of ports (â50100,50101â), ranges (â50100-50200â) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.</td> + </tr> + </tbody> +</table> http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index 41d73c4..9e3b24f 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -268,59 +268,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp ### TaskManager -The following parameters configure Flink's TaskManagers. - - -- `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file. - -- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS choose a free port). Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine. - -- `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: **0**, which lets the OS choose a free port). - -- `taskmanager.data.ssl.enabled`: Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: **true**) - -- `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: **512**). On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value. - -- `taskmanager.numberOfTaskSlots`: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: **1**). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores). - -- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of directories separated by the system's directory delimiter (for example ':' (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: **The system's tmp dir**). - -- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that `taskmanager.network.memory.min` and `taskmanager.network.memory.max` may override this fraction. (DEFAULT: **0.1**) - -- `taskmanager.network.memory.min`: Minimum memory size for network buffers in bytes (DEFAULT: **64 MB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`. - -- `taskmanager.network.memory.max`: Maximum memory size for network buffers in bytes (DEFAULT: **1 GB**). Previously, this was determined from `taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`. - -- `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: **2048**). If set, it will be mapped to `taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on `taskmanager.memory.segment-size`. - -- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task manager reserves on the JVM's heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by `taskmanager.memory.fraction`. (DEFAULT: **-1**) - -- `taskmanager.memory.fraction`: The relative amount of memory (with respect to `taskmanager.heap.mb`, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory (on-heap or off-heap depending on `taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if `taskmanager.memory.size` is not set. - -- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool. - -- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true. - -- `taskmanager.maxRegistrationDuration`: Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. The max registration duration requires a time unit specifier (ms/s/min/h/d) (e.g. "10 min"). (DEFAULT: **Inf**) - -- `taskmanager.initial-registration-pause`: The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause. The initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **500 ms**) - -- `taskmanager.max-registration-pause`: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **30 s**) - -- `taskmanager.refused-registration-pause`: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **10 s**) - -- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an `OutOfMemoryError` (DEFAULT: **false**). - -- `blob.fetch.retries`: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**). - -- `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**). - -- `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: **1000**). - -- `task.cancellation-interval`: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: **30000**). - -- `taskmanager.exit-on-fatal-akka-error`: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: **false**) - +{% include generated/task_manager_configuration.html %} ### Distributed Coordination (via Akka) http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b916048..b716d9e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -191,7 +191,10 @@ public final class ConfigConstants { /** * The config parameter defining the task manager's hostname. + * + * @deprecated use {@link TaskManagerOptions#HOST} instead */ + @Deprecated public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname"; /** @@ -207,7 +210,10 @@ public final class ConfigConstants { /** * Config parameter to override SSL support for taskmanager's data transport. + * + * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead */ + @Deprecated public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled"; /** @@ -284,39 +290,60 @@ public final class ConfigConstants { /** * The config parameter defining the number of task slots of a task manager. + * + * @deprecated use {@link TaskManagerOptions#NUM_TASK_SLOTS} instead */ + @Deprecated public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots"; /** * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM. + * + * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead */ + @Deprecated public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread"; /** * The interval (in ms) for the log thread to log the current memory usage. + * + * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS} instead */ + @Deprecated public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs"; /** * Defines the maximum time it can take for the TaskManager registration. If the duration is * exceeded without a successful registration, then the TaskManager terminates. + * + * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead */ + @Deprecated public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration"; /** * The initial registration pause between two consecutive registration attempts. The pause * is doubled for each new registration attempt until it reaches the maximum registration pause. + * + * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "taskmanager.initial-registration-pause"; /** * The maximum registration pause between two consecutive registration attempts. + * + * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = "taskmanager.max-registration-pause"; /** * The pause after a registration has been refused by the job manager before retrying to connect. + * + * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "taskmanager.refused-registration-pause"; /** @@ -1359,12 +1386,18 @@ public final class ConfigConstants { /** * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that * the TaskManager searches for a free port. + * + * @deprecated use {@link TaskManagerOptions#DATA_PORT} instead */ + @Deprecated public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0; /** * The default value to override ssl support for task manager's data transport. + * + * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead */ + @Deprecated public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true; /** @@ -1407,32 +1440,50 @@ public final class ConfigConstants { /** * Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM. + * + * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead */ + @Deprecated public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false; /** * The interval (in ms) for the log thread to log the current memory usage. + * + * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS} instead */ + @Deprecated public static final long DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 5000L; /** * The default task manager's maximum registration duration. + * + * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} instead */ + @Deprecated public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf"; /** * The default task manager's initial registration pause. + * + * @deprecated use {@link TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms"; /** * The default task manager's maximum registration pause. + * + * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE = "30 s"; /** * The default task manager's refused registration pause. + * + * @deprecated use {@link TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead */ + @Deprecated public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s"; /** http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index f2c2289..04b49ed 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -32,21 +32,23 @@ public class TaskManagerOptions { // General TaskManager Options // ------------------------------------------------------------------------ - // @TODO Migrate 'taskmanager.*' config options from ConfigConstants - /** * JVM heap size (in megabytes) for the TaskManagers. */ public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY = key("taskmanager.heap.mb") - .defaultValue(1024); + .defaultValue(1024) + .withDescription("JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of" + + " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" + + " YARN container, minus a certain tolerance value."); /** * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError. */ public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY = key("taskmanager.jvm-exit-on-oom") - .defaultValue(false); + .defaultValue(false) + .withDescription("Whether to kill the TaskManager when the task thread throws an OutOfMemoryError."); /** * Whether the quarantine monitor for task managers shall be started. The quarantine monitor @@ -55,7 +57,22 @@ public class TaskManagerOptions { */ public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR = key("taskmanager.exit-on-fatal-akka-error") - .defaultValue(false); + .defaultValue(false) + .withDescription("Whether the quarantine monitor for task managers shall be started. The quarantine monitor" + + " shuts down the actor system if it detects that it has quarantined another actor system" + + " or if it has been quarantined by another actor system."); + + /** + * The config parameter defining the task manager's hostname. + */ + public static final ConfigOption<String> HOST = + key("taskmanager.host") + .noDefaultValue() + .withDescription("The hostname of the network interface that the TaskManager binds to. By default, the" + + " TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers." + + " This option can be used to define a hostname if that strategy fails for some reason. Because" + + " different TaskManagers need different values for this option, it usually is specified in an" + + " additional non-shared TaskManager-specific config file."); /** * The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that @@ -63,7 +80,88 @@ public class TaskManagerOptions { */ public static final ConfigOption<String> RPC_PORT = key("taskmanager.rpc.port") - .defaultValue("0"); + .defaultValue("0") + .withDescription(" The task managerâs IPC port. Accepts a list of ports (â50100,50101â), ranges" + + " (â50100-50200â) or a combination of both. It is recommended to set a range of ports to avoid" + + " collisions when multiple TaskManagers are running on the same machine."); + + /** + * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that + * the TaskManager searches for a free port. + */ + public static final ConfigOption<Integer> DATA_PORT = + key("taskmanager.data.port") + .defaultValue(0) + .withDescription("The task managerâs port used for data exchange operations."); + + /** + * Config parameter to override SSL support for taskmanager's data transport. + */ + public static final ConfigOption<Boolean> DATA_SSL_ENABLED = + key("taskmanager.data.ssl.enabled") + .defaultValue(true) + .withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" + + " global ssl flag " + SecurityOptions.SSL_ENABLED.key() + " is set to true"); + + /** + * The initial registration pause between two consecutive registration attempts. The pause + * is doubled for each new registration attempt until it reaches the maximum registration pause. + */ + public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE = + key("taskmanager.initial-registration-pause") + .defaultValue("500 ms") + .withDescription("The initial registration pause between two consecutive registration attempts. The pause" + + " is doubled for each new registration attempt until it reaches the maximum registration pause."); + + /** + * The maximum registration pause between two consecutive registration attempts. + */ + public static final ConfigOption<String> MAX_REGISTRATION_PAUSE = + key("taskmanager.max-registration-pause") + .defaultValue("30 s") + .withDescription("The maximum registration pause between two consecutive registration attempts. The max" + + " registration pause requires a time unit specifier (ms/s/min/h/d)."); + + /** + * The pause after a registration has been refused by the job manager before retrying to connect. + */ + public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE = + key("taskmanager.refused-registration-pause") + .defaultValue("10 s") + .withDescription("The pause after a registration has been refused by the job manager before retrying to connect."); + + /** + * Defines the maximum time it can take for the TaskManager registration. If the duration is + * exceeded without a successful registration, then the TaskManager terminates. + */ + public static final ConfigOption<String> MAX_REGISTRATION_DURATION = + key("taskmanager.maxRegistrationDuration") + .defaultValue("Inf") + .withDescription("Defines the maximum time it can take for the TaskManager registration. If the duration is" + + " exceeded without a successful registration, then the TaskManager terminates."); + + /** + * The config parameter defining the number of task slots of a task manager. + */ + public static final ConfigOption<Integer> NUM_TASK_SLOTS = + key("taskmanager.numberOfTaskSlots") + .defaultValue(1) + .withDescription("The number of parallel operator or user function instances that a single TaskManager can" + + " run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or" + + " operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the" + + " available memory is divided between the different operator or function instances. This value" + + " is typically proportional to the number of physical CPU cores that the TaskManager's machine has" + + " (e.g., equal to the number of cores, or half the number of cores)."); + + public static final ConfigOption<Boolean> DEBUG_MEMORY_USAGE_START_LOG_THREAD = + key("taskmanager.debug.memory.startLogThread") + .defaultValue(false) + .withDescription("Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM."); + + public static final ConfigOption<Long> DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = + key("taskmanager.debug.memory.logIntervalMs") + .defaultValue(5000L) + .withDescription("The interval (in ms) for the log thread to log the current memory usage."); // ------------------------------------------------------------------------ // Managed Memory Options @@ -74,7 +172,8 @@ public class TaskManagerOptions { */ public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = key("taskmanager.memory.segment-size") - .defaultValue(32768); + .defaultValue(32768) + .withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes)."); /** * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not @@ -82,7 +181,9 @@ public class TaskManagerOptions { */ public static final ConfigOption<Long> MANAGED_MEMORY_SIZE = key("taskmanager.memory.size") - .defaultValue(-1L); + .defaultValue(-1L) + .withDescription("Amount of memory to be allocated by the task manager's memory manager (in megabytes)." + + " If not set, a relative fraction will be allocated."); /** * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is @@ -90,7 +191,13 @@ public class TaskManagerOptions { */ public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION = key("taskmanager.memory.fraction") - .defaultValue(0.7f); + .defaultValue(0.7f) + .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" + + " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." + + " For example, a value of `0.8` means that a task manager reserves 80% of its memory" + + " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" + + " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() + + " is not set."); /** * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager @@ -98,14 +205,17 @@ public class TaskManagerOptions { **/ public static final ConfigOption<Boolean> MEMORY_OFF_HEAP = key("taskmanager.memory.off-heap") - .defaultValue(false); + .defaultValue(false) + .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" + + " TaskManager as well as the network buffers."); /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE = key("taskmanager.memory.preallocate") - .defaultValue(false); + .defaultValue(false) + .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting."); // ------------------------------------------------------------------------ // Network Options @@ -128,21 +238,28 @@ public class TaskManagerOptions { */ public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION = key("taskmanager.network.memory.fraction") - .defaultValue(0.1f); + .defaultValue(0.1f) + .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" + + " data exchange channels a TaskManager can have at the same time and how well buffered the channels" + + " are. If a job is rejected or you get a warning that the system has not enough buffers available," + + " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" + + "` and \"taskmanager.network.memory.max\" may override this fraction."); /** * Minimum memory size for network buffers (in bytes). */ public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN = key("taskmanager.network.memory.min") - .defaultValue(64L << 20); // 64 MB + .defaultValue(64L << 20) // 64 MB + .withDescription("Minimum memory size for network buffers (in bytes)."); /** * Maximum memory size for network buffers (in bytes). */ public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX = key("taskmanager.network.memory.max") - .defaultValue(1024L << 20); // 1 GB + .defaultValue(1024L << 20) // 1 GB + .withDescription("Maximum memory size for network buffers (in bytes)."); /** * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). @@ -151,14 +268,16 @@ public class TaskManagerOptions { */ public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL = key("taskmanager.network.memory.buffers-per-channel") - .defaultValue(2); + .defaultValue(2) + .withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)."); /** * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") - .defaultValue(8); + .defaultValue(8) + .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)."); /** * Minimum backoff for partition requests of input channels. @@ -166,7 +285,8 @@ public class TaskManagerOptions { public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL = key("taskmanager.network.request-backoff.initial") .defaultValue(100) - .withDeprecatedKeys("taskmanager.net.request-backoff.initial"); + .withDeprecatedKeys("taskmanager.net.request-backoff.initial") + .withDescription("Minimum backoff for partition requests of input channels."); /** * Maximum backoff for partition requests of input channels. @@ -174,7 +294,8 @@ public class TaskManagerOptions { public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX = key("taskmanager.network.request-backoff.max") .defaultValue(10000) - .withDeprecatedKeys("taskmanager.net.request-backoff.max"); + .withDeprecatedKeys("taskmanager.net.request-backoff.max") + .withDescription("Maximum backoff for partition requests of input channels."); /** * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue @@ -182,7 +303,8 @@ public class TaskManagerOptions { */ public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS = key("taskmanager.network.detailed-metrics") - .defaultValue(false); + .defaultValue(false) + .withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths."); // ------------------------------------------------------------------------ // Task Options @@ -195,7 +317,8 @@ public class TaskManagerOptions { public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL = key("task.cancellation.interval") .defaultValue(30000L) - .withDeprecatedKeys("task.cancellation-interval"); + .withDeprecatedKeys("task.cancellation-interval") + .withDescription("Time interval between two successive task cancellation attempts in milliseconds."); /** * Timeout in milliseconds after which a task cancellation times out and @@ -204,8 +327,10 @@ public class TaskManagerOptions { */ public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT = key("task.cancellation.timeout") - .defaultValue(180000L); - + .defaultValue(180000L) + .withDescription("Timeout in milliseconds after which a task cancellation times out and" + + " leads to a fatal TaskManager error. A value of 0 deactivates" + + " the watch dog."); /** * The maximum number of bytes that a checkpoint alignment may buffer. * If the checkpoint alignment buffers more than the configured amount of @@ -215,7 +340,10 @@ public class TaskManagerOptions { */ public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = key("task.checkpoint.alignment.max-size") - .defaultValue(-1L); + .defaultValue(-1L) + .withDescription("The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint" + + " alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped)." + + " A value of -1 indicates that there is no limit."); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index ecfbc60..eab7382 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; @@ -244,7 +245,7 @@ public class BootstrapTools { cfg.setInteger(JobManagerOptions.PORT, jobManagerPort); } - cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, registrationTimeout.toString()); + cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, registrationTimeout.toString()); if (numSlots != -1){ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); } http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 4ae77bec..b5bebc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.io.network.netty; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.net.SSLUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,8 +212,7 @@ public class NettyConfig { } public boolean getSSLEnabled() { - return config.getBoolean(ConfigConstants.TASK_MANAGER_DATA_SSL_ENABLED, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED) + return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED) && SSLUtils.getSSLEnabled(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index cbde0d5..aebefd6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -185,9 +185,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { final Time finiteRegistrationDuration; try { - Duration maxRegistrationDuration = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); + Duration maxRegistrationDuration = Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION)); if (maxRegistrationDuration.isFinite()) { finiteRegistrationDuration = Time.milliseconds(maxRegistrationDuration.toMillis()); } else { @@ -195,14 +193,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); + TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e); } final Time initialRegistrationPause; try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); + Duration pause = Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE)); if (pause.isFinite()) { initialRegistrationPause = Time.milliseconds(pause.toMillis()); } else { @@ -210,14 +206,13 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e); } final Time maxRegistrationPause; try { Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); + TaskManagerOptions.MAX_REGISTRATION_PAUSE)); if (pause.isFinite()) { maxRegistrationPause = Time.milliseconds(pause.toMillis()); } else { @@ -225,14 +220,12 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e); } final Time refusedRegistrationPause; try { - Duration pause = Duration.create(configuration.getString( - ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, - ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); + Duration pause = Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE)); if (pause.isFinite()) { refusedRegistrationPause = Time.milliseconds(pause.toMillis()); } else { @@ -240,7 +233,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { } } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid format for parameter " + - ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e); } final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index c86d7c4..07cf660 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -257,10 +257,9 @@ public class TaskManagerServicesConfiguration { // ----> hosts / ports for communication and data exchange - int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); - checkConfigParameter(dataport >= 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), "Leave config parameter empty or use 0 to let the system choose a port automatically."); checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 8ef2e36..e864306 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -205,9 +205,7 @@ class LocalFlinkMiniCluster( val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange) - val dataPort = config.getInteger( - ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) + val dataPort = config.getInteger(TaskManagerOptions.DATA_PORT) if (rpcPortIterator.hasNext) { val rpcPort = rpcPortIterator.next() @@ -216,7 +214,7 @@ class LocalFlinkMiniCluster( } } if (dataPort > 0) { - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index) + config.setInteger(TaskManagerOptions.DATA_PORT, dataPort + index) } val localExecution = numTaskManagers == 1 http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index e8b043e..8dbb01d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1874,14 +1874,12 @@ object TaskManager { // if desired, start the logging daemon that periodically logs the // memory usage information if (LOG.isInfoEnabled && configuration.getBoolean( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD)) + TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD)) { LOG.info("Starting periodic memory usage logger") val interval = configuration.getLong( - ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS, - ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) + TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS) val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem) logger.start() http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 986f3fa..ceb92c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; @@ -267,7 +268,7 @@ public class TaskManagerRegistrationTest extends TestLogger { try { // registration timeout of 1 second Configuration tmConfig = new Configuration(); - tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms"); + tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms"); highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, @@ -325,7 +326,7 @@ public class TaskManagerRegistrationTest extends TestLogger { FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS); Configuration tmConfig = new Configuration(config); - tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString()); + tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause.toString()); highAvailabilityServices.setJobMasterLeaderRetriever( HighAvailabilityServices.DEFAULT_JOB_ID, @@ -407,8 +408,8 @@ public class TaskManagerRegistrationTest extends TestLogger { long maxDelay = 30000; Configuration tmConfig = new Configuration(config); - tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms"); - tmConfig.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms"); + tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, refusedRegistrationPause + " ms"); + tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, initialRegistrationPause + " ms"); // we make the test actor (the test kit) the JobManager to intercept // the messages http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index ade2f3a..107826f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -244,7 +244,7 @@ public class TaskManagerStartupTest extends TestLogger { final Configuration cfg = new Configuration(); cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); - cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort()); + cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort()); cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L); TaskManager.startTaskManagerComponentsAndActor( http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index d739bb3..9d41bfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -960,7 +960,7 @@ public class TaskManagerTest extends TestLogger { final int dataPort = NetUtils.getAvailablePort(); Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.DATA_PORT, dataPort); config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); @@ -1173,7 +1173,7 @@ public class TaskManagerTest extends TestLogger { final int dataPort = NetUtils.getAvailablePort(); Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.DATA_PORT, dataPort); config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 44f14a0..1a3419b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions} +import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.messages.Acknowledge @@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem) config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) config.setInteger(JobManagerOptions.PORT, 0) - config.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, "50 ms") - config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, "100 ms") + config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms") + config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms") val cluster = new TestingCluster(config, singleActorSystem = false) http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index d72f4ac..8104d49 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -19,8 +19,8 @@ package org.apache.flink.yarn; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -427,7 +427,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); - int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + int timeout = flinkConfig.getInteger(TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), DEFAULT_TASK_MANAGER_REGISTRATION_DURATION); FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS); final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(