[FLINK-8475][config][docs] Integrate TM options

This closes #5471.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57ec03ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57ec03ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57ec03ed

Branch: refs/heads/master
Commit: 57ec03ed3dc6ce3fbb563d3274b032945ba53510
Parents: 4bbe2dc
Author: zentol <ches...@apache.org>
Authored: Mon Jan 22 18:15:34 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Feb 14 12:28:47 2018 +0100

----------------------------------------------------------------------
 .../generated/task_manager_configuration.html   | 161 +++++++++++++++++
 docs/ops/config.md                              |  54 +-----
 .../flink/configuration/ConfigConstants.java    |  51 ++++++
 .../flink/configuration/TaskManagerOptions.java | 174 ++++++++++++++++---
 .../clusterframework/BootstrapTools.java        |   3 +-
 .../runtime/io/network/netty/NettyConfig.java   |   5 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  23 +--
 .../TaskManagerServicesConfiguration.java       |   5 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../TaskManagerRegistrationTest.java            |   9 +-
 .../taskmanager/TaskManagerStartupTest.java     |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   4 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   6 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   4 +-
 15 files changed, 395 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html 
b/docs/_includes/generated/task_manager_configuration.html
new file mode 100644
index 0000000..080010c
--- /dev/null
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -0,0 +1,161 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>task.cancellation.interval</h5></td>
+            <td>30000</td>
+            <td>Time interval between two successive task cancellation 
attempts in milliseconds.</td>
+        </tr>
+        <tr>
+            <td><h5>task.cancellation.timeout</h5></td>
+            <td>180000</td>
+            <td>Timeout in milliseconds after which a task cancellation times 
out and leads to a fatal TaskManager error. A value of 0 deactivates the watch 
dog.</td>
+        </tr>
+        <tr>
+            <td><h5>task.checkpoint.alignment.max-size</h5></td>
+            <td>-1</td>
+            <td>The maximum number of bytes that a checkpoint alignment may 
buffer. If the checkpoint alignment buffers more than the configured amount of 
data, the checkpoint is aborted (skipped). A value of -1 indicates that there 
is no limit.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.data.port</h5></td>
+            <td>0</td>
+            <td>The task manager’s port used for data exchange 
operations.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.data.ssl.enabled</h5></td>
+            <td>true</td>
+            <td>Enable SSL support for the taskmanager data transport. This is 
applicable only when the global ssl flag security.ssl.enabled is set to 
true</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.debug.memory.logIntervalMs</h5></td>
+            <td>5000</td>
+            <td>The interval (in ms) for the log thread to log the current 
memory usage.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.debug.memory.startLogThread</h5></td>
+            <td>false</td>
+            <td>Flag indicating whether to start a thread, which repeatedly 
logs the memory usage of the JVM.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.exit-on-fatal-akka-error</h5></td>
+            <td>false</td>
+            <td>Whether the quarantine monitor for task managers shall be 
started. The quarantine monitor shuts down the actor system if it detects that 
it has quarantined another actor system or if it has been quarantined by 
another actor system.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.heap.mb</h5></td>
+            <td>1024</td>
+            <td>JVM heap size (in megabytes) for the TaskManagers, which are 
the parallel workers of the system. On YARN setups, this value is automatically 
configured to the size of the TaskManager's YARN container, minus a certain 
tolerance value.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.host</h5></td>
+            <td>(none)</td>
+            <td>The hostname of the network interface that the TaskManager 
binds to. By default, the TaskManager searches for network interfaces that can 
connect to the JobManager and other TaskManagers. This option can be used to 
define a hostname if that strategy fails for some reason. Because different 
TaskManagers need different values for this option, it usually is specified in 
an additional non-shared TaskManager-specific config file.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.initial-registration-pause</h5></td>
+            <td>"500 ms"</td>
+            <td>The initial registration pause between two consecutive 
registration attempts. The pause is doubled for each new registration attempt 
until it reaches the maximum registration pause.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.jvm-exit-on-oom</h5></td>
+            <td>false</td>
+            <td>Whether to kill the TaskManager when the task thread throws an 
OutOfMemoryError.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.max-registration-pause</h5></td>
+            <td>"30 s"</td>
+            <td>The maximum registration pause between two consecutive 
registration attempts. The max registration pause requires a time unit 
specifier (ms/s/min/h/d).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.maxRegistrationDuration</h5></td>
+            <td>"Inf"</td>
+            <td>Defines the maximum time it can take for the TaskManager 
registration. If the duration is exceeded without a successful registration, 
then the TaskManager terminates.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.memory.fraction</h5></td>
+            <td>0.7</td>
+            <td>The relative amount of memory (after subtracting the amount of 
memory used by network buffers) that the task manager reserves for sorting, 
hash tables, and caching of intermediate results. For example, a value of `0.8` 
means that a task manager reserves 80% of its memory for internal data buffers, 
leaving 20% of free memory for the task manager's heap for objects created by 
user-defined functions. This parameter is only evaluated, if 
taskmanager.memory.size is not set.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.memory.off-heap</h5></td>
+            <td>false</td>
+            <td>Memory allocation method (JVM heap or off-heap), used for 
managed memory of the TaskManager as well as the network buffers.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.memory.preallocate</h5></td>
+            <td>false</td>
+            <td>Whether TaskManager managed memory should be pre-allocated 
when the TaskManager is starting.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.memory.segment-size</h5></td>
+            <td>32768</td>
+            <td>Size of memory buffers used by the network stack and the 
memory manager (in bytes).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.memory.size</h5></td>
+            <td>-1</td>
+            <td>Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not set, a relative fraction will be allocated.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.detailed-metrics</h5></td>
+            <td>false</td>
+            <td>Boolean flag to enable/disable more detailed metrics about 
inbound/outbound network queue lengths.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
+            <td>2</td>
+            <td>Number of network buffers to use for each outgoing/incoming 
channel (subpartition/input channel).</td>
+        </tr>
+        <tr>
+            
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
+            <td>8</td>
+            <td>Number of extra network buffers to use for each 
outgoing/incoming gate (result partition/input gate).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.fraction</h5></td>
+            <td>0.1</td>
+            <td>Fraction of JVM memory to use for network buffers. This 
determines how many streaming data exchange channels a TaskManager can have at 
the same time and how well buffered the channels are. If a job is rejected or 
you get a warning that the system has not enough buffers available, increase 
this value or the min/max values below. Also note, that 
"taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may 
override this fraction.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max</h5></td>
+            <td>1073741824</td>
+            <td>Maximum memory size for network buffers (in bytes).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.min</h5></td>
+            <td>67108864</td>
+            <td>Minimum memory size for network buffers (in bytes).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.request-backoff.initial</h5></td>
+            <td>100</td>
+            <td>Minimum backoff for partition requests of input channels.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.request-backoff.max</h5></td>
+            <td>10000</td>
+            <td>Maximum backoff for partition requests of input channels.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.numberOfTaskSlots</h5></td>
+            <td>1</td>
+            <td>The number of parallel operator or user function instances 
that a single TaskManager can run. If this value is larger than 1, a single 
TaskManager takes multiple instances of a function or operator. That way, the 
TaskManager can utilize multiple CPU cores, but at the same time, the available 
memory is divided between the different operator or function instances. This 
value is typically proportional to the number of physical CPU cores that the 
TaskManager's machine has (e.g., equal to the number of cores, or half the 
number of cores).</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.refused-registration-pause</h5></td>
+            <td>"10 s"</td>
+            <td>The pause after a registration has been refused by the job 
manager before retrying to connect.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.rpc.port</h5></td>
+            <td>"0"</td>
+            <td> The task manager’s IPC port. Accepts a list of ports 
(“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
TaskManagers are running on the same machine.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 41d73c4..9e3b24f 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -268,59 +268,7 @@ These parameters configure the default HDFS used by Flink. 
Setups that do not sp
 
 ### TaskManager
 
-The following parameters configure Flink's TaskManagers.
-
-
-- `taskmanager.hostname`: The hostname of the network interface that the 
TaskManager binds to. By default, the TaskManager searches for network 
interfaces that can connect to the JobManager and other TaskManagers. This 
option can be used to define a hostname if that strategy fails for some reason. 
Because different TaskManagers need different values for this option, it 
usually is specified in an additional non-shared TaskManager-specific config 
file.
-
-- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which 
lets the OS choose a free port). Flink also accepts a list of ports 
("50100,50101"), ranges ("50100-50200") or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
TaskManagers are running on the same machine.
-
-- `taskmanager.data.port`: The task manager's port used for data exchange 
operations (DEFAULT: **0**, which lets the OS choose a free port).
-
-- `taskmanager.data.ssl.enabled`: Enable SSL support for the taskmanager data 
transport. This is applicable only when the global ssl flag 
security.ssl.enabled is set to true (DEFAULT: **true**)
-
-- `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, 
which are the parallel workers of the system. In contrast to Hadoop, Flink runs 
operators (e.g., join, aggregate) and user-defined functions (e.g., Map, 
Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so 
this value should be as large as possible (DEFAULT: **512**). On YARN setups, 
this value is automatically configured to the size of the TaskManager's YARN 
container, minus a certain tolerance value.
-
-- `taskmanager.numberOfTaskSlots`: The number of parallel operator or user 
function instances that a single TaskManager can run (DEFAULT: **1**). If this 
value is larger than 1, a single TaskManager takes multiple instances of a 
function or operator. That way, the TaskManager can utilize multiple CPU cores, 
but at the same time, the available memory is divided between the different 
operator or function instances. This value is typically proportional to the 
number of physical CPU cores that the TaskManager's machine has (e.g., equal to 
the number of cores, or half the number of cores).
-
-- `taskmanager.tmp.dirs`: The directory for temporary files, or a list of 
directories separated by the system's directory delimiter (for example ':' 
(colon) on Linux/Unix). If multiple directories are specified, then the 
temporary files will be distributed across the directories in a round robin 
fashion. The I/O manager component will spawn one reading and one writing 
thread per directory. A directory may be listed multiple times to have the I/O 
manager use multiple threads for it (for example if it is physically stored on 
a very fast disc or RAID) (DEFAULT: **The system's tmp dir**).
-
-- `taskmanager.network.memory.fraction`: Fraction of JVM memory to use for 
network buffers. This determines how many streaming data exchange channels a 
TaskManager can have at the same time and how well buffered the channels are. 
If a job is rejected or you get a warning that the system has not enough 
buffers available, increase this value or the min/max values below. Also note, 
that `taskmanager.network.memory.min` and `taskmanager.network.memory.max` may 
override this fraction. (DEFAULT: **0.1**)
-
-- `taskmanager.network.memory.min`: Minimum memory size for network buffers in 
bytes (DEFAULT: **64 MB**). Previously, this was determined from 
`taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
-
-- `taskmanager.network.memory.max`: Maximum memory size for network buffers in 
bytes (DEFAULT: **1 GB**). Previously, this was determined from 
`taskmanager.network.numberOfBuffers` and `taskmanager.memory.segment-size`.
-
-- `taskmanager.network.numberOfBuffers` (deprecated, replaced by the three 
parameters above): The number of buffers available to the network stack. This 
number determines how many streaming data exchange channels a TaskManager can 
have at the same time and how well buffered the channels are. If a job is 
rejected or you get a warning that the system has not enough buffers available, 
increase this value (DEFAULT: **2048**). If set, it will be mapped to 
`taskmanager.network.memory.min` and `taskmanager.network.memory.max` based on 
`taskmanager.memory.segment-size`.
-
-- `taskmanager.memory.size`: The amount of memory (in megabytes) that the task 
manager reserves on the JVM's heap space for sorting, hash tables, and caching 
of intermediate results. If unspecified (-1), the memory manager will take a 
fixed ratio of the heap memory available to the JVM, as specified by 
`taskmanager.memory.fraction`. (DEFAULT: **-1**)
-
-- `taskmanager.memory.fraction`: The relative amount of memory (with respect 
to `taskmanager.heap.mb`, after subtracting the amount of memory used by 
network buffers) that the task manager reserves for sorting, hash tables, and 
caching of intermediate results. For example, a value of `0.8` means that a 
task manager reserves 80% of its memory (on-heap or off-heap depending on 
`taskmanager.memory.off-heap`) for internal data buffers, leaving 20% of free 
memory for the task manager's heap for objects created by user-defined 
functions. (DEFAULT: 0.7) This parameter is only evaluated, if 
`taskmanager.memory.size` is not set.
-
-- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to 
periodically log memory and Garbage collection statistics. The statistics 
include current heap-, off-heap, and other memory pool utilization, as well as 
the time spent on garbage collection, by heap memory pool.
-
-- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in 
which the TaskManagers log the memory and garbage collection statistics. Only 
has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
-
-- `taskmanager.maxRegistrationDuration`: Defines the maximum time it can take 
for the TaskManager registration. If the duration is exceeded without a 
successful registration, then the TaskManager terminates. The max registration 
duration requires a time unit specifier (ms/s/min/h/d) (e.g. "10 min"). 
(DEFAULT: **Inf**)
-
-- `taskmanager.initial-registration-pause`: The initial registration pause 
between two consecutive registration attempts. The pause is doubled for each 
new registration attempt until it reaches the maximum registration pause. The 
initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. 
"5 s"). (DEFAULT: **500 ms**)
-
-- `taskmanager.max-registration-pause`: The maximum registration pause between 
two consecutive registration attempts. The max registration pause requires a 
time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **30 s**)
-
-- `taskmanager.refused-registration-pause`: The pause after a registration has 
been refused by the job manager before retrying to connect. The refused 
registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). 
(DEFAULT: **10 s**)
-
-- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should 
immediately terminate the JVM if the task thread throws an `OutOfMemoryError` 
(DEFAULT: **false**).
-
-- `blob.fetch.retries`: The number of retries for the TaskManager to download 
BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).
-
-- `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR 
file downloads) that the JobManager serves (DEFAULT: **50**).
-
-- `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR 
file downloads) that the JobManager allows (DEFAULT: **1000**).
-
-- `task.cancellation-interval`: Time interval between two successive task 
cancellation attempts in milliseconds (DEFAULT: **30000**).
-
-- `taskmanager.exit-on-fatal-akka-error`: Whether the TaskManager shall be 
terminated in case of a fatal Akka error (quarantining event). (DEFAULT: 
**false**)
-
+{% include generated/task_manager_configuration.html %}
 
 ### Distributed Coordination (via Akka)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b916048..b716d9e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -191,7 +191,10 @@ public final class ConfigConstants {
 
        /**
         * The config parameter defining the task manager's hostname.
+        *
+        * @deprecated use {@link TaskManagerOptions#HOST} instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_HOSTNAME_KEY = 
"taskmanager.hostname";
 
        /**
@@ -207,7 +210,10 @@ public final class ConfigConstants {
 
        /**
         * Config parameter to override SSL support for taskmanager's data 
transport.
+        *
+        * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_DATA_SSL_ENABLED = 
"taskmanager.data.ssl.enabled";
 
        /**
@@ -284,39 +290,60 @@ public final class ConfigConstants {
 
        /**
         * The config parameter defining the number of task slots of a task 
manager.
+        *
+        * @deprecated use {@link TaskManagerOptions#NUM_TASK_SLOTS} instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_NUM_TASK_SLOTS = 
"taskmanager.numberOfTaskSlots";
 
        /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
         */
+       @Deprecated
        public static final String 
TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = 
"taskmanager.debug.memory.startLogThread";
 
        /**
         * The interval (in ms) for the log thread to log the current memory 
usage.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS} instead
         */
+       @Deprecated
        public static final String 
TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 
"taskmanager.debug.memory.logIntervalMs";
 
        /**
         * Defines the maximum time it can take for the TaskManager 
registration. If the duration is
         * exceeded without a successful registration, then the TaskManager 
terminates.
+        *
+        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} 
instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = 
"taskmanager.maxRegistrationDuration";
 
        /**
         * The initial registration pause between two consecutive registration 
attempts. The pause
         * is doubled for each new registration attempt until it reaches the 
maximum registration pause.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = 
"taskmanager.initial-registration-pause";
 
        /**
         * The maximum registration pause between two consecutive registration 
attempts.
+        *
+        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} 
instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = 
"taskmanager.max-registration-pause";
 
        /**
         * The pause after a registration has been refused by the job manager 
before retrying to connect.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
         */
+       @Deprecated
        public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
 
        /**
@@ -1359,12 +1386,18 @@ public final class ConfigConstants {
        /**
         * The default network port the task manager expects to receive 
transfer envelopes on. The {@code 0} means that
         * the TaskManager searches for a free port.
+        *
+        * @deprecated use {@link TaskManagerOptions#DATA_PORT} instead
         */
+       @Deprecated
        public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;
 
        /**
         * The default value to override ssl support for task manager's data 
transport.
+        *
+        * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
         */
+       @Deprecated
        public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = 
true;
 
        /**
@@ -1407,32 +1440,50 @@ public final class ConfigConstants {
 
        /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
         */
+       @Deprecated
        public static final boolean 
DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
 
        /**
         * The interval (in ms) for the log thread to log the current memory 
usage.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS} instead
         */
+       @Deprecated
        public static final long 
DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 5000L;
 
        /**
         * The default task manager's maximum registration duration.
+        *
+        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} 
instead
         */
+       @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
 
        /**
         * The default task manager's initial registration pause.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
         */
+       @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
 
        /**
         * The default task manager's maximum registration pause.
+        *
+        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} 
instead
         */
+       @Deprecated
        public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE 
= "30 s";
 
        /**
         * The default task manager's refused registration pause.
+        *
+        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
         */
+       @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index f2c2289..04b49ed 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -32,21 +32,23 @@ public class TaskManagerOptions {
        //  General TaskManager Options
        // 
------------------------------------------------------------------------
 
-       // @TODO Migrate 'taskmanager.*' config options from ConfigConstants
-
        /**
         * JVM heap size (in megabytes) for the TaskManagers.
         */
        public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
                        key("taskmanager.heap.mb")
-                       .defaultValue(1024);
+                       .defaultValue(1024)
+                       .withDescription("JVM heap size (in megabytes) for the 
TaskManagers, which are the parallel workers of" +
+                               " the system. On YARN setups, this value is 
automatically configured to the size of the TaskManager's" +
+                               " YARN container, minus a certain tolerance 
value.");
 
        /**
         * Whether to kill the TaskManager when the task thread throws an 
OutOfMemoryError.
         */
        public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
                        key("taskmanager.jvm-exit-on-oom")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Whether to kill the TaskManager when 
the task thread throws an OutOfMemoryError.");
 
        /**
         * Whether the quarantine monitor for task managers shall be started. 
The quarantine monitor
@@ -55,7 +57,22 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
                        key("taskmanager.exit-on-fatal-akka-error")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Whether the quarantine monitor for 
task managers shall be started. The quarantine monitor" +
+                               " shuts down the actor system if it detects 
that it has quarantined another actor system" +
+                               " or if it has been quarantined by another 
actor system.");
+
+       /**
+        * The config parameter defining the task manager's hostname.
+        */
+       public static final ConfigOption<String> HOST =
+               key("taskmanager.host")
+                       .noDefaultValue()
+                       .withDescription("The hostname of the network interface 
that the TaskManager binds to. By default, the" +
+                               " TaskManager searches for network interfaces 
that can connect to the JobManager and other TaskManagers." +
+                               " This option can be used to define a hostname 
if that strategy fails for some reason. Because" +
+                               " different TaskManagers need different values 
for this option, it usually is specified in an" +
+                               " additional non-shared TaskManager-specific 
config file.");
 
        /**
         * The default network port range the task manager expects incoming IPC 
connections. The {@code "0"} means that
@@ -63,7 +80,88 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<String> RPC_PORT =
                key("taskmanager.rpc.port")
-                       .defaultValue("0");
+                       .defaultValue("0")
+                       .withDescription(" The task manager’s IPC port. 
Accepts a list of ports (“50100,50101”), ranges" +
+                               " (“50100-50200”) or a combination of both. 
It is recommended to set a range of ports to avoid" +
+                               " collisions when multiple TaskManagers are 
running on the same machine.");
+
+       /**
+        * The default network port the task manager expects to receive 
transfer envelopes on. The {@code 0} means that
+        * the TaskManager searches for a free port.
+        */
+       public static final ConfigOption<Integer> DATA_PORT =
+               key("taskmanager.data.port")
+                       .defaultValue(0)
+                       .withDescription("The task manager’s port used for 
data exchange operations.");
+
+       /**
+        * Config parameter to override SSL support for taskmanager's data 
transport.
+        */
+       public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
+               key("taskmanager.data.ssl.enabled")
+                       .defaultValue(true)
+                       .withDescription("Enable SSL support for the 
taskmanager data transport. This is applicable only when the" +
+                               " global ssl flag " + 
SecurityOptions.SSL_ENABLED.key() + " is set to true");
+
+       /**
+        * The initial registration pause between two consecutive registration 
attempts. The pause
+        * is doubled for each new registration attempt until it reaches the 
maximum registration pause.
+        */
+       public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
+               key("taskmanager.initial-registration-pause")
+                       .defaultValue("500 ms")
+                       .withDescription("The initial registration pause 
between two consecutive registration attempts. The pause" +
+                               " is doubled for each new registration attempt 
until it reaches the maximum registration pause.");
+
+       /**
+        * The maximum registration pause between two consecutive registration 
attempts.
+        */
+       public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
+               key("taskmanager.max-registration-pause")
+                       .defaultValue("30 s")
+                       .withDescription("The maximum registration pause 
between two consecutive registration attempts. The max" +
+                               " registration pause requires a time unit 
specifier (ms/s/min/h/d).");
+
+       /**
+        * The pause after a registration has been refused by the job manager 
before retrying to connect.
+        */
+       public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
+               key("taskmanager.refused-registration-pause")
+                       .defaultValue("10 s")
+                       .withDescription("The pause after a registration has 
been refused by the job manager before retrying to connect.");
+
+       /**
+        * Defines the maximum time it can take for the TaskManager 
registration. If the duration is
+        * exceeded without a successful registration, then the TaskManager 
terminates.
+        */
+       public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
+               key("taskmanager.maxRegistrationDuration")
+                       .defaultValue("Inf")
+                       .withDescription("Defines the maximum time it can take 
for the TaskManager registration. If the duration is" +
+                               " exceeded without a successful registration, 
then the TaskManager terminates.");
+
+       /**
+        * The config parameter defining the number of task slots of a task 
manager.
+        */
+       public static final ConfigOption<Integer> NUM_TASK_SLOTS =
+               key("taskmanager.numberOfTaskSlots")
+                       .defaultValue(1)
+                       .withDescription("The number of parallel operator or 
user function instances that a single TaskManager can" +
+                               " run. If this value is larger than 1, a single 
TaskManager takes multiple instances of a function or" +
+                               " operator. That way, the TaskManager can 
utilize multiple CPU cores, but at the same time, the" +
+                               " available memory is divided between the 
different operator or function instances. This value" +
+                               " is typically proportional to the number of 
physical CPU cores that the TaskManager's machine has" +
+                               " (e.g., equal to the number of cores, or half 
the number of cores).");
+
+       public static final ConfigOption<Boolean> 
DEBUG_MEMORY_USAGE_START_LOG_THREAD =
+               key("taskmanager.debug.memory.startLogThread")
+                       .defaultValue(false)
+                       .withDescription("Flag indicating whether to start a 
thread, which repeatedly logs the memory usage of the JVM.");
+
+       public static final ConfigOption<Long> 
DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
+               key("taskmanager.debug.memory.logIntervalMs")
+                       .defaultValue(5000L)
+                       .withDescription("The interval (in ms) for the log 
thread to log the current memory usage.");
 
        // 
------------------------------------------------------------------------
        //  Managed Memory Options
@@ -74,7 +172,8 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
                        key("taskmanager.memory.segment-size")
-                       .defaultValue(32768);
+                       .defaultValue(32768)
+                       .withDescription("Size of memory buffers used by the 
network stack and the memory manager (in bytes).");
 
        /**
         * Amount of memory to be allocated by the task manager's memory 
manager (in megabytes). If not
@@ -82,7 +181,9 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
                        key("taskmanager.memory.size")
-                       .defaultValue(-1L);
+                       .defaultValue(-1L)
+                       .withDescription("Amount of memory to be allocated by 
the task manager's memory manager (in megabytes)." +
+                               " If not set, a relative fraction will be 
allocated.");
 
        /**
         * Fraction of free memory allocated by the memory manager if {@link 
#MANAGED_MEMORY_SIZE} is
@@ -90,7 +191,13 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
                        key("taskmanager.memory.fraction")
-                       .defaultValue(0.7f);
+                       .defaultValue(0.7f)
+                       .withDescription("The relative amount of memory (after 
subtracting the amount of memory used by network" +
+                               " buffers) that the task manager reserves for 
sorting, hash tables, and caching of intermediate results." +
+                               " For example, a value of `0.8` means that a 
task manager reserves 80% of its memory" +
+                               " for internal data buffers, leaving 20% of 
free memory for the task manager's heap for objects" +
+                               " created by user-defined functions. This 
parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
+                               " is not set.");
 
        /**
         * Memory allocation method (JVM heap or off-heap), used for managed 
memory of the TaskManager
@@ -98,14 +205,17 @@ public class TaskManagerOptions {
         **/
        public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
                        key("taskmanager.memory.off-heap")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Memory allocation method (JVM heap or 
off-heap), used for managed memory of the" +
+                               " TaskManager as well as the network buffers.");
 
        /**
         * Whether TaskManager managed memory should be pre-allocated when the 
TaskManager is starting.
         */
        public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
                        key("taskmanager.memory.preallocate")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Whether TaskManager managed memory 
should be pre-allocated when the TaskManager is starting.");
 
        // 
------------------------------------------------------------------------
        //  Network Options
@@ -128,21 +238,28 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION 
=
                        key("taskmanager.network.memory.fraction")
-                       .defaultValue(0.1f);
+                       .defaultValue(0.1f)
+                       .withDescription("Fraction of JVM memory to use for 
network buffers. This determines how many streaming" +
+                               " data exchange channels a TaskManager can have 
at the same time and how well buffered the channels" +
+                               " are. If a job is rejected or you get a 
warning that the system has not enough buffers available," +
+                               " increase this value or the min/max values 
below. Also note, that \"taskmanager.network.memory.min\"" +
+                               "` and \"taskmanager.network.memory.max\" may 
override this fraction.");
 
        /**
         * Minimum memory size for network buffers (in bytes).
         */
        public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
                        key("taskmanager.network.memory.min")
-                       .defaultValue(64L << 20); // 64 MB
+                       .defaultValue(64L << 20) // 64 MB
+                       .withDescription("Minimum memory size for network 
buffers (in bytes).");
 
        /**
         * Maximum memory size for network buffers (in bytes).
         */
        public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
                        key("taskmanager.network.memory.max")
-                       .defaultValue(1024L << 20); // 1 GB
+                       .defaultValue(1024L << 20) // 1 GB
+                       .withDescription("Maximum memory size for network 
buffers (in bytes).");
 
        /**
         * Number of network buffers to use for each outgoing/incoming channel 
(subpartition/input channel).
@@ -151,14 +268,16 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
                        key("taskmanager.network.memory.buffers-per-channel")
-                       .defaultValue(2);
+                       .defaultValue(2)
+                       .withDescription("Number of network buffers to use for 
each outgoing/incoming channel (subpartition/input channel).");
 
        /**
         * Number of extra network buffers to use for each outgoing/incoming 
gate (result partition/input gate).
         */
        public static final ConfigOption<Integer> 
NETWORK_EXTRA_BUFFERS_PER_GATE =
                        
key("taskmanager.network.memory.floating-buffers-per-gate")
-                       .defaultValue(8);
+                       .defaultValue(8)
+                       .withDescription("Number of extra network buffers to 
use for each outgoing/incoming gate (result partition/input gate).");
 
        /**
         * Minimum backoff for partition requests of input channels.
@@ -166,7 +285,8 @@ public class TaskManagerOptions {
        public static final ConfigOption<Integer> 
NETWORK_REQUEST_BACKOFF_INITIAL =
                        key("taskmanager.network.request-backoff.initial")
                        .defaultValue(100)
-                       
.withDeprecatedKeys("taskmanager.net.request-backoff.initial");
+                       
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
+                       .withDescription("Minimum backoff for partition 
requests of input channels.");
 
        /**
         * Maximum backoff for partition requests of input channels.
@@ -174,7 +294,8 @@ public class TaskManagerOptions {
        public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
                        key("taskmanager.network.request-backoff.max")
                        .defaultValue(10000)
-                       
.withDeprecatedKeys("taskmanager.net.request-backoff.max");
+                       
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
+                       .withDescription("Maximum backoff for partition 
requests of input channels.");
 
        /**
         * Boolean flag to enable/disable more detailed metrics about 
inbound/outbound network queue
@@ -182,7 +303,8 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
                        key("taskmanager.network.detailed-metrics")
-                       .defaultValue(false);
+                       .defaultValue(false)
+                       .withDescription("Boolean flag to enable/disable more 
detailed metrics about inbound/outbound network queue lengths.");
 
        // 
------------------------------------------------------------------------
        //  Task Options
@@ -195,7 +317,8 @@ public class TaskManagerOptions {
        public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
                        key("task.cancellation.interval")
                        .defaultValue(30000L)
-                       .withDeprecatedKeys("task.cancellation-interval");
+                       .withDeprecatedKeys("task.cancellation-interval")
+                       .withDescription("Time interval between two successive 
task cancellation attempts in milliseconds.");
 
        /**
         * Timeout in milliseconds after which a task cancellation times out and
@@ -204,8 +327,10 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
                        key("task.cancellation.timeout")
-                       .defaultValue(180000L);
-
+                       .defaultValue(180000L)
+                       .withDescription("Timeout in milliseconds after which a 
task cancellation times out and" +
+                               " leads to a fatal TaskManager error. A value 
of 0 deactivates" +
+                               " the watch dog.");
        /**
         * The maximum number of bytes that a checkpoint alignment may buffer.
         * If the checkpoint alignment buffers more than the configured amount 
of
@@ -215,7 +340,10 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<Long> 
TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
                        key("task.checkpoint.alignment.max-size")
-                       .defaultValue(-1L);
+                       .defaultValue(-1L)
+                       .withDescription("The maximum number of bytes that a 
checkpoint alignment may buffer. If the checkpoint" +
+                               " alignment buffers more than the configured 
amount of data, the checkpoint is aborted (skipped)." +
+                               " A value of -1 indicates that there is no 
limit.");
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ecfbc60..eab7382 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -244,7 +245,7 @@ public class BootstrapTools {
                        cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
                }
 
-               
cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
registrationTimeout.toString());
+               cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, 
registrationTimeout.toString());
                if (numSlots != -1){
                        
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 4ae77bec..b5bebc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,8 +212,7 @@ public class NettyConfig {
        }
 
        public boolean getSSLEnabled() {
-               return 
config.getBoolean(ConfigConstants.TASK_MANAGER_DATA_SSL_ENABLED,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED)
+               return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
                        && SSLUtils.getSSLEnabled(config);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cbde0d5..aebefd6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,9 +185,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                final Time finiteRegistrationDuration;
 
                try {
-                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION));
+                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
                        if (maxRegistrationDuration.isFinite()) {
                                finiteRegistrationDuration = 
Time.milliseconds(maxRegistrationDuration.toMillis());
                        } else {
@@ -195,14 +193,12 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e);
+                               
TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
                }
 
                final Time initialRegistrationPause;
                try {
-                       Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE));
+                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
                                initialRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -210,14 +206,13 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
                }
 
                final Time maxRegistrationPause;
                try {
                        Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE));
+                               TaskManagerOptions.MAX_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
                                maxRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -225,14 +220,12 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
                }
 
                final Time refusedRegistrationPause;
                try {
-                       Duration pause = 
Duration.create(configuration.getString(
-                               
ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE,
-                               
ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE));
+                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
                        if (pause.isFinite()) {
                                refusedRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -240,7 +233,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
                }
 
                final boolean exitOnOom = 
configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c86d7c4..07cf660 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -257,10 +257,9 @@ public class TaskManagerServicesConfiguration {
 
                // ----> hosts / ports for communication and data exchange
 
-               int dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-                       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
+               int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
 
-               checkConfigParameter(dataport >= 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+               checkConfigParameter(dataport >= 0, dataport, 
TaskManagerOptions.DATA_PORT.key(),
                        "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
 
                checkConfigParameter(slots >= 1, slots, 
ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8ef2e36..e864306 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -205,9 +205,7 @@ class LocalFlinkMiniCluster(
 
     val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange)
 
-    val dataPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
+    val dataPort = config.getInteger(TaskManagerOptions.DATA_PORT)
 
     if (rpcPortIterator.hasNext) {
       val rpcPort = rpcPortIterator.next()
@@ -216,7 +214,7 @@ class LocalFlinkMiniCluster(
       }
     }
     if (dataPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + 
index)
+      config.setInteger(TaskManagerOptions.DATA_PORT, dataPort + index)
     }
 
     val localExecution = numTaskManagers == 1

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e8b043e..8dbb01d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1874,14 +1874,12 @@ object TaskManager {
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(
-        ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
-        
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+        TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
       {
         LOG.info("Starting periodic memory usage logger")
 
         val interval = configuration.getLong(
-          ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
-          
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
+          TaskManagerOptions.DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS)
 
         val logger = new MemoryLogger(LOG.logger, interval, taskManagerSystem)
         logger.start()

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 986f3fa..ceb92c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -267,7 +268,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                        try {
                                // registration timeout of 1 second
                                Configuration tmConfig = new Configuration();
-                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 
ms");
+                               
tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +326,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                                FiniteDuration refusedRegistrationPause = new 
FiniteDuration(500, TimeUnit.MILLISECONDS);
                                Configuration tmConfig = new 
Configuration(config);
-                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause.toString());
+                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause.toString());
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +408,8 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                long maxDelay = 30000;
 
                                Configuration tmConfig = new 
Configuration(config);
-                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause + " ms");
-                               
tmConfig.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, 
initialRegistrationPause + " ms");
+                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause + " ms");
+                               
tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, 
initialRegistrationPause + " ms");
 
                                // we make the test actor (the test kit) the 
JobManager to intercept
                                // the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index ade2f3a..107826f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -244,7 +244,7 @@ public class TaskManagerStartupTest extends TestLogger {
 
                        final Configuration cfg = new Configuration();
                        
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
-                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
blocker.getLocalPort());
+                       cfg.setInteger(TaskManagerOptions.DATA_PORT, 
blocker.getLocalPort());
                        cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L);
 
                        TaskManager.startTaskManagerComponentsAndActor(

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d739bb3..9d41bfb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -960,7 +960,7 @@ public class TaskManagerTest extends TestLogger {
 
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+                               config.setInteger(TaskManagerOptions.DATA_PORT, 
dataPort);
                                
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                                
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
@@ -1173,7 +1173,7 @@ public class TaskManagerTest extends TestLogger {
 
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+                               config.setInteger(TaskManagerOptions.DATA_PORT, 
dataPort);
                                
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
                                
config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
                                
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 44f14a0..1a3419b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.runtime.jobmanager
 
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.messages.Acknowledge
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
-    config.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, 
"50 ms")
-    config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, "100 
ms")
+    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
+    config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
 
     val cluster = new TestingCluster(config, singleActorSystem = false)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ec03ed/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d72f4ac..8104d49 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -19,8 +19,8 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -427,7 +427,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                                
taskManagerParameters.taskManagerTotalMemoryMB(),
                                taskManagerParameters.taskManagerHeapSizeMB(),
                                
taskManagerParameters.taskManagerDirectMemoryLimitMB());
-               int timeout = 
flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
+               int timeout = 
flinkConfig.getInteger(TaskManagerOptions.MAX_REGISTRATION_DURATION.key(),
                                DEFAULT_TASK_MANAGER_REGISTRATION_DURATION);
                FiniteDuration teRegistrationTimeout = new 
FiniteDuration(timeout, TimeUnit.SECONDS);
                final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(

Reply via email to