[FLINK-8835] [taskmanager] Cleanup TaskManager config keys

This closes #5808.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e2581e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e2581e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e2581e0

Branch: refs/heads/release-1.5
Commit: 9e2581e0443ff47124de41a8cdcd9c18e64b0fab
Parents: c8e0a31
Author: zhangminglei <zml13856086...@163.com>
Authored: Wed Apr 4 17:05:22 2018 +0800
Committer: zentol <ches...@apache.org>
Committed: Fri Apr 6 15:24:03 2018 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 20 +++----
 .../flink/configuration/TaskManagerOptions.java | 55 +++++++++++---------
 .../clusterframework/BootstrapTools.java        |  2 +-
 .../runtime/io/network/netty/NettyConfig.java   |  2 +-
 .../taskexecutor/TaskManagerConfiguration.java  | 16 +++---
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../TaskManagerRegistrationTest.java            |  8 +--
 .../runtime/io/InputProcessorUtil.java          |  2 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  4 +-
 9 files changed, 59 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b716d9e..f148360 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -299,7 +299,7 @@ public final class ConfigConstants {
        /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
         *
-        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+        * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
         */
        @Deprecated
        public static final String 
TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = 
"taskmanager.debug.memory.startLogThread";
@@ -316,7 +316,7 @@ public final class ConfigConstants {
         * Defines the maximum time it can take for the TaskManager 
registration. If the duration is
         * exceeded without a successful registration, then the TaskManager 
terminates.
         *
-        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} 
instead
+        * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} 
instead
         */
        @Deprecated
        public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = 
"taskmanager.maxRegistrationDuration";
@@ -325,7 +325,7 @@ public final class ConfigConstants {
         * The initial registration pause between two consecutive registration 
attempts. The pause
         * is doubled for each new registration attempt until it reaches the 
maximum registration pause.
         *
-        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
         */
        @Deprecated
        public static final String TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = 
"taskmanager.initial-registration-pause";
@@ -333,7 +333,7 @@ public final class ConfigConstants {
        /**
         * The maximum registration pause between two consecutive registration 
attempts.
         *
-        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} 
instead
+        * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} 
instead
         */
        @Deprecated
        public static final String TASK_MANAGER_MAX_REGISTARTION_PAUSE = 
"taskmanager.max-registration-pause";
@@ -341,7 +341,7 @@ public final class ConfigConstants {
        /**
         * The pause after a registration has been refused by the job manager 
before retrying to connect.
         *
-        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
         */
        @Deprecated
        public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
@@ -1441,7 +1441,7 @@ public final class ConfigConstants {
        /**
         * Flag indicating whether to start a thread, which repeatedly logs the 
memory usage of the JVM.
         *
-        * @deprecated use {@link 
TaskManagerOptions#DEBUG_MEMORY_USAGE_START_LOG_THREAD} instead
+        * @deprecated use {@link TaskManagerOptions#DEBUG_MEMORY_LOG} instead
         */
        @Deprecated
        public static final boolean 
DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
@@ -1457,7 +1457,7 @@ public final class ConfigConstants {
        /**
         * The default task manager's maximum registration duration.
         *
-        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_DURATION} 
instead
+        * @deprecated use {@link TaskManagerOptions#REGISTRATION_TIMEOUT} 
instead
         */
        @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
@@ -1465,7 +1465,7 @@ public final class ConfigConstants {
        /**
         * The default task manager's initial registration pause.
         *
-        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_PAUSE} instead
+        * @deprecated use {@link 
TaskManagerOptions#INITIAL_REGISTRATION_BACKOFF} instead
         */
        @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE = "500 ms";
@@ -1473,7 +1473,7 @@ public final class ConfigConstants {
        /**
         * The default task manager's maximum registration pause.
         *
-        * @deprecated use {@link TaskManagerOptions#MAX_REGISTRATION_PAUSE} 
instead
+        * @deprecated use {@link TaskManagerOptions#REGISTRATION_MAX_BACKOFF} 
instead
         */
        @Deprecated
        public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE 
= "30 s";
@@ -1481,7 +1481,7 @@ public final class ConfigConstants {
        /**
         * The default task manager's refused registration pause.
         *
-        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_PAUSE} instead
+        * @deprecated use {@link 
TaskManagerOptions#REFUSED_REGISTRATION_BACKOFF} instead
         */
        @Deprecated
        public static final String 
DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c7b0782..2bd3091 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -104,40 +104,44 @@ public class TaskManagerOptions {
                                " global ssl flag " + 
SecurityOptions.SSL_ENABLED.key() + " is set to true");
 
        /**
-        * The initial registration pause between two consecutive registration 
attempts. The pause
-        * is doubled for each new registration attempt until it reaches the 
maximum registration pause.
+        * The initial registration backoff between two consecutive 
registration attempts. The backoff
+        * is doubled for each new registration attempt until it reaches the 
maximum registration backoff.
         */
-       public static final ConfigOption<String> INITIAL_REGISTRATION_PAUSE =
-               key("taskmanager.initial-registration-pause")
+       public static final ConfigOption<String> INITIAL_REGISTRATION_BACKOFF =
+               key("taskmanager.registration.initial-backoff")
                        .defaultValue("500 ms")
-                       .withDescription("The initial registration pause 
between two consecutive registration attempts. The pause" +
-                               " is doubled for each new registration attempt 
until it reaches the maximum registration pause.");
+                       
.withDeprecatedKeys("taskmanager.initial-registration-pause")
+                       .withDescription("The initial registration backoff 
between two consecutive registration attempts. The backoff" +
+                               " is doubled for each new registration attempt 
until it reaches the maximum registration backoff.");
 
        /**
-        * The maximum registration pause between two consecutive registration 
attempts.
+        * The maximum registration backoff between two consecutive 
registration attempts.
         */
-       public static final ConfigOption<String> MAX_REGISTRATION_PAUSE =
-               key("taskmanager.max-registration-pause")
+       public static final ConfigOption<String> REGISTRATION_MAX_BACKOFF =
+               key("taskmanager.registration.max-backoff")
                        .defaultValue("30 s")
-                       .withDescription("The maximum registration pause 
between two consecutive registration attempts. The max" +
-                               " registration pause requires a time unit 
specifier (ms/s/min/h/d).");
+                       
.withDeprecatedKeys("taskmanager.max-registration-pause")
+                       .withDescription("The maximum registration backoff 
between two consecutive registration attempts. The max" +
+                               " registration backoff requires a time unit 
specifier (ms/s/min/h/d).");
 
        /**
-        * The pause after a registration has been refused by the job manager 
before retrying to connect.
+        * The backoff after a registration has been refused by the job manager 
before retrying to connect.
         */
-       public static final ConfigOption<String> REFUSED_REGISTRATION_PAUSE =
-               key("taskmanager.refused-registration-pause")
+       public static final ConfigOption<String> REFUSED_REGISTRATION_BACKOFF =
+               key("taskmanager.registration.refused-backoff")
                        .defaultValue("10 s")
-                       .withDescription("The pause after a registration has 
been refused by the job manager before retrying to connect.");
+                       
.withDeprecatedKeys("taskmanager.refused-registration-pause")
+                       .withDescription("The backoff after a registration has 
been refused by the job manager before retrying to connect.");
 
        /**
-        * Defines the maximum time it can take for the TaskManager 
registration. If the duration is
+        * Defines the timeout it can take for the TaskManager registration. If 
the duration is
         * exceeded without a successful registration, then the TaskManager 
terminates.
         */
-       public static final ConfigOption<String> MAX_REGISTRATION_DURATION =
-               key("taskmanager.maxRegistrationDuration")
+       public static final ConfigOption<String> REGISTRATION_TIMEOUT =
+               key("taskmanager.registration.timeout")
                        .defaultValue("Inf")
-                       .withDescription("Defines the maximum time it can take 
for the TaskManager registration. If the duration is" +
+                       
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
+                       .withDescription("Defines the timeout for the 
TaskManager registration. If the duration is" +
                                " exceeded without a successful registration, 
then the TaskManager terminates.");
 
        /**
@@ -153,14 +157,16 @@ public class TaskManagerOptions {
                                " is typically proportional to the number of 
physical CPU cores that the TaskManager's machine has" +
                                " (e.g., equal to the number of cores, or half 
the number of cores).");
 
-       public static final ConfigOption<Boolean> 
DEBUG_MEMORY_USAGE_START_LOG_THREAD =
-               key("taskmanager.debug.memory.startLogThread")
+       public static final ConfigOption<Boolean> DEBUG_MEMORY_LOG =
+               key("taskmanager.debug.memory.log")
                        .defaultValue(false)
+                       
.withDeprecatedKeys("taskmanager.debug.memory.startLogThread")
                        .withDescription("Flag indicating whether to start a 
thread, which repeatedly logs the memory usage of the JVM.");
 
        public static final ConfigOption<Long> 
DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS =
-               key("taskmanager.debug.memory.logIntervalMs")
+               key("taskmanager.debug.memory.log-interval")
                        .defaultValue(5000L)
+                       
.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
                        .withDescription("The interval (in ms) for the log 
thread to log the current memory usage.");
 
        // 
------------------------------------------------------------------------
@@ -321,9 +327,10 @@ public class TaskManagerOptions {
         * credit-based flow control.
         */
        @Deprecated
-       public static final ConfigOption<Boolean> 
NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED =
-                       
key("taskmanager.network.credit-based-flow-control.enabled")
+       public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+                       key("taskmanager.network.credit-model")
                        .defaultValue(true)
+                       
.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
                        .withDescription("Boolean flag to enable/disable 
network credit-based flow control.");
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index eab7382..102274d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -245,7 +245,7 @@ public class BootstrapTools {
                        cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
                }
 
-               cfg.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, 
registrationTimeout.toString());
+               cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, 
registrationTimeout.toString());
                if (numSlots != -1){
                        
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 8572361..18527c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -225,7 +225,7 @@ public class NettyConfig {
        }
 
        public boolean isCreditBasedEnabled() {
-               return 
config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED);
+               return 
config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index cb6fe51..1bf42ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -185,7 +185,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                final Time finiteRegistrationDuration;
 
                try {
-                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(TaskManagerOptions.MAX_REGISTRATION_DURATION));
+                       Duration maxRegistrationDuration = 
Duration.create(configuration.getString(TaskManagerOptions.REGISTRATION_TIMEOUT));
                        if (maxRegistrationDuration.isFinite()) {
                                finiteRegistrationDuration = 
Time.milliseconds(maxRegistrationDuration.toMillis());
                        } else {
@@ -193,12 +193,12 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
TaskManagerOptions.MAX_REGISTRATION_DURATION.key(), e);
+                               TaskManagerOptions.REGISTRATION_TIMEOUT.key(), 
e);
                }
 
                final Time initialRegistrationPause;
                try {
-                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE));
+                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF));
                        if (pause.isFinite()) {
                                initialRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -206,13 +206,13 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
                }
 
                final Time maxRegistrationPause;
                try {
                        Duration pause = 
Duration.create(configuration.getString(
-                               TaskManagerOptions.MAX_REGISTRATION_PAUSE));
+                               TaskManagerOptions.REGISTRATION_MAX_BACKOFF));
                        if (pause.isFinite()) {
                                maxRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -220,12 +220,12 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
                }
 
                final Time refusedRegistrationPause;
                try {
-                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE));
+                       Duration pause = 
Duration.create(configuration.getString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF));
                        if (pause.isFinite()) {
                                refusedRegistrationPause = 
Time.milliseconds(pause.toMillis());
                        } else {
@@ -233,7 +233,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        }
                } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid format for 
parameter " +
-                               
TaskManagerOptions.INITIAL_REGISTRATION_PAUSE.key(), e);
+                               
TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF.key(), e);
                }
 
                final boolean exitOnOom = 
configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0aaeae3..071a333 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1884,7 +1884,7 @@ object TaskManager {
       // if desired, start the logging daemon that periodically logs the
       // memory usage information
       if (LOG.isInfoEnabled && configuration.getBoolean(
-        TaskManagerOptions.DEBUG_MEMORY_USAGE_START_LOG_THREAD))
+        TaskManagerOptions.DEBUG_MEMORY_LOG))
       {
         LOG.info("Starting periodic memory usage logger")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 6b65095..ad32a4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -267,7 +267,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                        try {
                                // registration timeout of 1 second
                                Configuration tmConfig = new Configuration();
-                               
tmConfig.setString(TaskManagerOptions.MAX_REGISTRATION_DURATION, "500 ms");
+                               
tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -325,7 +325,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
 
                                FiniteDuration refusedRegistrationPause = new 
FiniteDuration(500, TimeUnit.MILLISECONDS);
                                Configuration tmConfig = new 
Configuration(config);
-                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause.toString());
+                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, 
refusedRegistrationPause.toString());
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -407,8 +407,8 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                                long maxDelay = 30000;
 
                                Configuration tmConfig = new 
Configuration(config);
-                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_PAUSE, 
refusedRegistrationPause + " ms");
-                               
tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, 
initialRegistrationPause + " ms");
+                               
tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, 
refusedRegistrationPause + " ms");
+                               
tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, 
initialRegistrationPause + " ms");
 
                                // we make the test actor (the test kit) the 
JobManager to intercept
                                // the messages

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 1ae34b3..d1c5b72 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -51,7 +51,7 @@ public class InputProcessorUtil {
                                        + " must be positive or -1 (infinite)");
                        }
 
-                       if 
(taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_BASED_FLOW_CONTROL_ENABLED))
 {
+                       if 
(taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
                                barrierHandler = new BarrierBuffer(inputGate, 
new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
                        } else {
                                barrierHandler = new BarrierBuffer(inputGate, 
new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e2581e0/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1a3419b..5fe7b1d 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -136,8 +136,8 @@ class JobManagerFailsITCase(_system: ActorSystem)
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
-    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_PAUSE, "50 ms")
-    config.setString(TaskManagerOptions.MAX_REGISTRATION_PAUSE, "100 ms")
+    config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
+    config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
 
     val cluster = new TestingCluster(config, singleActorSystem = false)
 

Reply via email to