[FLINK-6495] Migrate Akka configuration options

This closes #3935.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/302c6741
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/302c6741
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/302c6741

Branch: refs/heads/master
Commit: 302c6741f862f13c6ea3d5490a31fadc20e976c8
Parents: 9a9e193
Author: zjureel <[email protected]>
Authored: Thu May 18 12:34:40 2017 +0800
Committer: zentol <[email protected]>
Committed: Mon May 22 20:28:22 2017 +0200

----------------------------------------------------------------------
 .../client/program/ClientConnectionTest.java    |   5 +-
 .../apache/flink/client/program/ClientTest.java |   3 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   5 +-
 .../apache/flink/configuration/AkkaOptions.java | 102 +++++++++++++++++--
 .../flink/configuration/ConfigConstants.java    |  92 +++++++++++++++++
 .../MesosTaskManagerRunner.java                 |   3 +-
 .../webmonitor/metrics/MetricFetcher.java       |   4 +-
 .../client/JobAttachmentClientActor.java        |   4 +-
 .../client/JobSubmissionClientActor.java        |   4 +-
 .../clusterframework/FlinkResourceManager.java  |   4 +-
 .../restart/FailureRateRestartStrategy.java     |   3 +-
 .../restart/FixedDelayRestartStrategy.java      |   6 +-
 .../restart/RestartStrategyFactory.java         |   6 +-
 .../runtime/query/QueryableStateClient.java     |   8 +-
 .../ResourceManagerConfiguration.java           |   8 +-
 .../slotmanager/SlotManagerConfiguration.java   |  12 +--
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |   6 +-
 .../taskexecutor/TaskManagerConfiguration.java  |   3 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  65 ++++--------
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +-
 .../clusterframework/ResourceManagerTest.java   |   6 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../runtime/jobmanager/JobManagerTest.java      |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   7 +-
 .../TaskManagerRegistrationTest.java            |   9 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |   9 +-
 .../flink/runtime/akka/AkkaSslITCase.scala      |   6 +-
 .../jobmanager/JobManagerConnectionTest.scala   |   4 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   4 +-
 .../runtime/testingUtils/TestingUtils.scala     |   4 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  11 +-
 .../accumulators/AccumulatorLiveITCase.java     |   3 +-
 .../test/cancelling/CancelingTestBase.java      |   3 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   5 +-
 .../jar/StreamingCustomInputSplitProgram.java   |   4 +-
 .../RemoteEnvironmentITCase.java                |   5 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  11 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   7 +-
 .../recovery/ProcessFailureCancelingITCase.java |  10 +-
 .../TaskManagerFailureRecoveryITCase.java       |   7 +-
 .../ZooKeeperLeaderElectionITCase.java          |   3 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |   3 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |   3 +-
 43 files changed, 324 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 246a75c..eb9f3c5 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -92,8 +93,8 @@ public class ClientConnectionTest extends TestLogger {
        private static void testFailureBehavior(final InetSocketAddress 
unreachableEndpoint) throws Exception {
 
                final Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
(ASK_STARTUP_TIMEOUT) + " ms");
-               config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, 
(CONNECT_TIMEOUT) + " ms");
+               config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + 
" ms");
+               config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + 
" ms");
                config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
unreachableEndpoint.getHostName());
                config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
unreachableEndpoint.getPort());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index b7ade2a..13a2564 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
@@ -97,7 +98,7 @@ public class ClientTest extends TestLogger {
                config = new Configuration();
                config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
"localhost");
                config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
freePort);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+               config.setString(AkkaOptions.ASK_TIMEOUT, 
AkkaOptions.ASK_TIMEOUT.defaultValue());
 
                try {
                        scala.Tuple2<String, Object> address = new 
scala.Tuple2<String, Object>("localhost", freePort);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 21794f9..626335d 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -39,6 +39,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -269,7 +270,7 @@ public class FlinkClient {
        JobID getTopologyJobId(final String id) {
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
                if (this.timeout != null) {
-                       
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+                       configuration.setString(AkkaOptions.ASK_TIMEOUT, 
this.timeout);
                }
 
                try {
@@ -309,7 +310,7 @@ public class FlinkClient {
        private FiniteDuration getTimeout() {
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
                if (this.timeout != null) {
-                       
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+                       configuration.setString(AkkaOptions.ASK_TIMEOUT, 
this.timeout);
                }
 
                return AkkaUtils.getClientTimeout(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 97b209e..9bfc237 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -29,30 +29,114 @@ import org.apache.flink.annotation.PublicEvolving;
 public class AkkaOptions {
 
        /**
-        * Timeout for akka ask calls
+        * Timeout for akka ask calls.
         */
-       public static final ConfigOption<String> AKKA_ASK_TIMEOUT = 
ConfigOptions
+       public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
                .key("akka.ask.timeout")
                .defaultValue("10 s");
 
        /**
+        * The Akka death watch heartbeat interval.
+        */
+       public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL = 
ConfigOptions
+               .key("akka.watch.heartbeat.interval")
+               .defaultValue(ASK_TIMEOUT.defaultValue());
+
+       /**
+        * The maximum acceptable Akka death watch heartbeat pause.
+        */
+       public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = 
ConfigOptions
+               .key("akka.watch.heartbeat.pause")
+               .defaultValue(ASK_TIMEOUT.defaultValue());
+
+       /**
         * The Akka tcp connection timeout.
         */
-       public static final ConfigOption<String> AKKA_TCP_TIMEOUT = 
ConfigOptions
+       public static final ConfigOption<String> TCP_TIMEOUT = ConfigOptions
                .key("akka.tcp.timeout")
                .defaultValue("20 s");
 
        /**
-        * The Akka death watch heartbeat interval.
+        * Timeout for the startup of the actor system.
         */
-       public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_INTERVAL 
= ConfigOptions
-               .key("akka.watch.heartbeat.interval")
+       public static final ConfigOption<String> STARTUP_TIMEOUT = ConfigOptions
+               .key("akka.startup-timeout")
+               .noDefaultValue();
+
+       /**
+        * Heartbeat interval of the transport failure detector.
+        */
+       public static final ConfigOption<String> TRANSPORT_HEARTBEAT_INTERVAL = 
ConfigOptions
+               .key("akka.transport.heartbeat.interval")
+               .defaultValue("1000 s");
+
+       /**
+        * Allowed heartbeat pause for the transport failure detector.
+        */
+       public static final ConfigOption<String> TRANSPORT_HEARTBEAT_PAUSE = 
ConfigOptions
+               .key("akka.transport.heartbeat.pause")
+               .defaultValue("6000 s");
+
+       /**
+        * Detection threshold of transport failure detector.
+        */
+       public static final ConfigOption<Double> TRANSPORT_THRESHOLD = 
ConfigOptions
+               .key("akka.transport.threshold")
+               .defaultValue(300.0);
+
+       /**
+        * Detection threshold for the phi accrual watch failure detector.
+        */
+       public static final ConfigOption<Integer> WATCH_THRESHOLD = 
ConfigOptions
+               .key("akka.watch.threshold")
+               .defaultValue(12);
+
+       /**
+        * Override SSL support for the Akka transport.
+        */
+       public static final ConfigOption<Boolean> SSL_ENABLED = ConfigOptions
+               .key("akka.ssl.enabled")
+               .defaultValue(true);
+
+       /**
+        * Maximum framesize of akka messages.
+        */
+       public static final ConfigOption<String> FRAMESIZE = ConfigOptions
+               .key("akka.framesize")
+               .defaultValue("10485760b");
+
+       /**
+        * Maximum number of messages until another actor is executed by the 
same thread.
+        */
+       public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT = 
ConfigOptions
+               .key("akka.throughput")
+               .defaultValue(15);
+
+       /**
+        * Log lifecycle events.
+        */
+       public static final ConfigOption<Boolean> LOG_LIFECYCLE_EVENTS = 
ConfigOptions
+               .key("akka.log.lifecycle.events")
+               .defaultValue(false);
+
+       /**
+        * Timeout for all blocking calls that look up remote actors.
+        */
+       public static final ConfigOption<String> LOOKUP_TIMEOUT = ConfigOptions
+               .key("akka.lookup.timeout")
                .defaultValue("10 s");
 
        /**
-        * The maximum acceptable Akka death watch heartbeat pause.
+        * Timeout for all blocking calls on the client side.
         */
-       public static final ConfigOption<String> AKKA_WATCH_HEARTBEAT_PAUSE = 
ConfigOptions
-               .key("akka.watch.heartbeat.pause")
+       public static final ConfigOption<String> CLIENT_TIMEOUT = ConfigOptions
+               .key("akka.client.timeout")
                .defaultValue("60 s");
+
+       /**
+        * Exit JVM on fatal Akka errors.
+        */
+       public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR = 
ConfigOptions
+               .key("akka.jvm-exit-on-fatal-error")
+               .defaultValue(true);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b5b5486..65e6c76 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -708,82 +708,130 @@ public final class ConfigConstants {
 
        /**
         * Timeout for the startup of the actor system
+        *
+        * @deprecated Use {@link AkkaOptions#STARTUP_TIMEOUT} instead.
         */
+       @Deprecated
        public static final String AKKA_STARTUP_TIMEOUT = 
"akka.startup-timeout";
 
        /**
         * Heartbeat interval of the transport failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} 
instead.
         */
+       @Deprecated
        public static final String AKKA_TRANSPORT_HEARTBEAT_INTERVAL = 
"akka.transport.heartbeat.interval";
 
        /**
         * Allowed heartbeat pause for the transport failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} 
instead.
         */
+       @Deprecated
        public static final String AKKA_TRANSPORT_HEARTBEAT_PAUSE = 
"akka.transport.heartbeat.pause";
 
        /**
         * Detection threshold of transport failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
         */
+       @Deprecated
        public static final String AKKA_TRANSPORT_THRESHOLD = 
"akka.transport.threshold";
 
        /**
         * Heartbeat interval of watch failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead.
         */
+       @Deprecated
        public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = 
"akka.watch.heartbeat.interval";
 
        /**
         * Allowed heartbeat pause for the watch failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead.
         */
+       @Deprecated
        public static final String AKKA_WATCH_HEARTBEAT_PAUSE = 
"akka.watch.heartbeat.pause";
 
        /**
         * Detection threshold for the phi accrual watch failure detector
+        *
+        * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
         */
+       @Deprecated
        public static final String AKKA_WATCH_THRESHOLD = 
"akka.watch.threshold";
 
        /**
         * Akka TCP timeout
+        *
+        * @deprecated Use {@link AkkaOptions#TCP_TIMEOUT} instead.
         */
+       @Deprecated
        public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";
 
        /**
         * Override SSL support for the Akka transport
+        *
+        * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
         */
+       @Deprecated
        public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";
 
        /**
         * Maximum framesize of akka messages
+        *
+        * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
         */
+       @Deprecated
        public static final String AKKA_FRAMESIZE = "akka.framesize";
 
        /**
         * Maximum number of messages until another actor is executed by the 
same thread
+        *
+        * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
         */
+       @Deprecated
        public static final String AKKA_DISPATCHER_THROUGHPUT = 
"akka.throughput";
 
        /**
         * Log lifecycle events
+        *
+        * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
         */
+       @Deprecated
        public static final String AKKA_LOG_LIFECYCLE_EVENTS = 
"akka.log.lifecycle.events";
 
        /**
         * Timeout for all blocking calls on the cluster side
+        *
+        * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
         */
+       @Deprecated
        public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
 
        /**
         * Timeout for all blocking calls that look up remote actors
+        *
+        * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
         */
+       @Deprecated
        public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
 
        /**
         * Timeout for all blocking calls on the client side
+        *
+        * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
         */
+       @Deprecated
        public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
 
        /**
         * Exit JVM on fatal Akka errors
+        *
+        * @deprecated Use {@link AkkaOptions#JVM_EXIT_ON_FATAL_ERROR} instead.
         */
+       @Deprecated
        public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = 
"akka.jvm-exit-on-fatal-error";
        
        // ----------------------------- Transport SSL 
Settings--------------------
@@ -1425,26 +1473,70 @@ public final class ConfigConstants {
 
        // ------------------------------ Akka Values 
------------------------------
 
+       /**
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_INTERVAL} 
instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 
s";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_HEARTBEAT_PAUSE} 
instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#TRANSPORT_THRESHOLD} instead.
+        */
+       @Deprecated
        public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
+       /**
+        * @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
+        */
+       @Deprecated
        public static double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
 
+       /**
+        * @deprecated Use {@link AkkaOptions#DISPATCHER_THROUGHPUT} instead.
+        */
+       @Deprecated
        public static int DEFAULT_AKKA_DISPATCHER_THROUGHPUT = 15;
 
+       /**
+        * @deprecated Use {@link AkkaOptions#LOG_LIFECYCLE_EVENTS} instead.
+        */
+       @Deprecated
        public static boolean DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS = false;
 
+       /**
+        * @deprecated Use {@link AkkaOptions#FRAMESIZE} instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#ASK_TIMEOUT} instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#LOOKUP_TIMEOUT} instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#CLIENT_TIMEOUT} instead.
+        */
+       @Deprecated
        public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
 
+       /**
+        * @deprecated Use {@link AkkaOptions#SSL_ENABLED} instead.
+        */
+       @Deprecated
        public static boolean DEFAULT_AKKA_SSL_ENABLED = true;
 
        // ----------------------------- SSL Values 
--------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 206c71b..625880b 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -108,7 +109,7 @@ public class MesosTaskManagerRunner {
                }
 
                // tell akka to die in case of an error
-               
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+               configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
 
                // Infer the resource identifier from the environment variable
                String containerID = 
Preconditions.checkNotNull(envs.get(MesosConfigKeys.ENV_FLINK_CONTAINER_ID));

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 4f92148..c0dcc99 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -23,7 +23,7 @@ import akka.dispatch.OnFailure;
 import akka.dispatch.OnSuccess;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -61,7 +61,7 @@ public class MetricFetcher {
        private final ActorSystem actorSystem;
        private final JobManagerRetriever retriever;
        private final ExecutionContext ctx;
-       private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+       private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(),
 TimeUnit.MILLISECONDS);
 
        private MetricStore metrics = new MetricStore();
        private MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
index ffab9cc..9451e20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -23,7 +23,7 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
@@ -114,7 +114,7 @@ public class JobAttachmentClientActor extends 
JobClientActor {
                                        client.tell(
                                                decorateMessage(new 
Status.Failure(
                                                        new 
JobClientActorRegistrationTimeoutException("Registration for Job at the 
JobManager " +
-                                                               "timed out. " + 
"You may increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT +
+                                                               "timed out. " + 
"You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() +
                                                                "' in case the 
JobManager needs more time to confirm the job client registration."))),
                                                getSelf());
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index a3fee21..babb0f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -22,7 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.dispatch.Futures;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -119,7 +119,7 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                                        client.tell(
                                                decorateMessage(new 
Status.Failure(
                                                        new 
JobClientActorSubmissionTimeoutException("Job submission to the JobManager 
timed out. " +
-                                                               "You may 
increase '" + ConfigConstants.AKKA_CLIENT_TIMEOUT + "' in case the JobManager " 
+
+                                                               "You may 
increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager " +
                                                                "needs more 
time to configure and confirm the job submission."))),
                                                getSelf());
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 77dbad4..f9c39c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -26,7 +26,7 @@ import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
@@ -148,7 +148,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
                }
                catch (Exception e) {
                        lt = new FiniteDuration(
-                               
Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(),
+                               
Duration.apply(AkkaOptions.LOOKUP_TIMEOUT.defaultValue()).toMillis(),
                                TimeUnit.MILLISECONDS);
                }
                this.messageTimeout = lt;

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index d95e1c3..36d81e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
@@ -92,7 +93,7 @@ public class FailureRateRestartStrategy implements 
RestartStrategy {
                String failuresIntervalString = configuration.getString(
                                
ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, 
Duration.apply(1, TimeUnit.MINUTES).toString()
                );
-               String timeoutString = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+               String timeoutString = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
                String delayString = 
configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, 
timeoutString);
 
                Duration failuresInterval = 
Duration.apply(failuresIntervalString);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index f51ea7c..2b62c00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
@@ -72,8 +73,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                int maxAttempts = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
1);
 
                String timeoutString = configuration.getString(
-                       ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
-                       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+                       AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
 
                String delayString = configuration.getString(
                        ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
@@ -87,7 +87,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                } catch (NumberFormatException nfe) {
                        if (delayString.equals(timeoutString)) {
                                throw new Exception("Invalid config value for " 
+
-                                               
ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
+                                               
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + timeoutString +
                                                ". Value must be a valid 
duration (such as '10 s' or '1 min')");
                        } else {
                                throw new Exception("Invalid config value for " 
+

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 27ee9b6..d1f547f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
@@ -88,8 +89,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                // support deprecated ConfigConstants values
                                final int numberExecutionRetries = 
configuration.getInteger(ConfigConstants.EXECUTION_RETRIES_KEY,
                                        
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
-                               String pauseString = 
configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
-                                       
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+                               String pauseString = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
                                String delayString = 
configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
                                        pauseString);
 
@@ -100,7 +100,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                } catch (NumberFormatException nfe) {
                                        if (delayString.equals(pauseString)) {
                                                throw new Exception("Invalid 
config value for " +
-                                                       
ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + pauseString +
+                                                       
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
                                                        ". Value must be a 
valid duration (such as '10 s' or '1 min')");
                                        } else {
                                                throw new Exception("Invalid 
config value for " +

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 9b05273..003d803 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -24,7 +24,7 @@ import akka.dispatch.Mapper;
 import akka.dispatch.Recover;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
@@ -114,13 +114,11 @@ public class QueryableStateClient {
                LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
 
                // Get the ask timeout
-               String askTimeoutString = config.getString(
-                               ConfigConstants.AKKA_ASK_TIMEOUT,
-                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+               String askTimeoutString = 
config.getString(AkkaOptions.ASK_TIMEOUT);
 
                Duration timeout = FiniteDuration.apply(askTimeoutString);
                if (!timeout.isFinite()) {
-                       throw new 
IllegalConfigurationException(ConfigConstants.AKKA_ASK_TIMEOUT
+                       throw new 
IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
                                        + " is not a finite timeout ('" + 
askTimeoutString + "')");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index 2c64f08..0216789 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -53,24 +53,24 @@ public class ResourceManagerConfiguration {
        // 
--------------------------------------------------------------------------
 
        public static ResourceManagerConfiguration 
fromConfiguration(Configuration configuration) throws ConfigurationException {
-               final String strTimeout = 
configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT);
+               final String strTimeout = 
configuration.getString(AkkaOptions.ASK_TIMEOUT);
                final Time timeout;
 
                try {
                        timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
                } catch (NumberFormatException e) {
                        throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
-                               "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', 
e);
+                               "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
                }
 
-               final String strHeartbeatInterval = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
+               final String strHeartbeatInterval = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
                final Time heartbeatInterval;
 
                try {
                        heartbeatInterval = 
Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
                } catch (NumberFormatException e) {
                        throw new ConfigurationException("Could not parse the 
resource manager's heartbeat interval " +
-                               "value " + 
AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
+                               "value " + AkkaOptions.WATCH_HEARTBEAT_INTERVAL 
+ '.', e);
                }
 
                return new ResourceManagerConfiguration(timeout, 
heartbeatInterval);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index a651168..75cad07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -19,9 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
@@ -55,18 +53,14 @@ public class SlotManagerConfiguration {
        }
 
        public static SlotManagerConfiguration fromConfiguration(Configuration 
configuration) throws ConfigurationException {
-               ConfigOption<String> timeoutOption = ConfigOptions
-                       .key(ConfigConstants.AKKA_ASK_TIMEOUT)
-                       .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
-
-               final String strTimeout = 
configuration.getString(timeoutOption);
+               final String strTimeout = 
configuration.getString(AkkaOptions.ASK_TIMEOUT);
                final Time timeout;
 
                try {
                        timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
                } catch (NumberFormatException e) {
                        throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
-                               "value " + timeoutOption + '.', e);
+                               "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
                }
 
                return new SlotManagerConfiguration(timeout, timeout, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 8789eed..810efff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -130,9 +130,7 @@ public class AkkaRpcServiceUtils {
 
                checkNotNull(config, "config is null");
 
-               final boolean sslEnabled = config.getBoolean(
-                                       ConfigConstants.AKKA_SSL_ENABLED,
-                                       
ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+               final boolean sslEnabled = 
config.getBoolean(AkkaOptions.SSL_ENABLED) &&
                                SSLUtils.getSSLEnabled(config);
 
                return getRpcUrl(

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index a6e4748..ea9f576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -146,7 +147,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                        timeout = 
Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
                } catch (Exception e) {
                        throw new IllegalArgumentException(
-                               "Invalid format for '" + 
ConfigConstants.AKKA_ASK_TIMEOUT +
+                               "Invalid format for '" + 
AkkaOptions.ASK_TIMEOUT.key() +
                                        "'.Use formats like '50 s' or '1 min' 
to specify the timeout.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 62fa73d..60a33ba 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -184,13 +184,11 @@ object AkkaUtils {
    * @return Flink's basic Akka config
    */
   private def getBasicAkkaConfig(configuration: Configuration): Config = {
-    val akkaThroughput = 
configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
-      ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
-    val lifecycleEvents = 
configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
-      ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+    val akkaThroughput = 
configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT)
+    val lifecycleEvents = 
configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
 
     val jvmExitOnFatalError = if (
-      configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, 
true)){
+      configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR)){
       "on"
     } else {
       "off"
@@ -269,48 +267,36 @@ object AkkaUtils {
                                   bindAddress: String, port: Int,
                                   externalHostname: String, externalPort: 
Int): Config = {
 
-    val akkaAskTimeout = Duration(configuration.getString(
-      ConfigConstants.AKKA_ASK_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+    val akkaAskTimeout = 
Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT))
 
     val startupTimeout = configuration.getString(
-      ConfigConstants.AKKA_STARTUP_TIMEOUT,
+      AkkaOptions.STARTUP_TIMEOUT,
       (akkaAskTimeout * 10).toString)
 
     val transportHeartbeatInterval = configuration.getString(
-      ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
-      ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL)
+      AkkaOptions.TRANSPORT_HEARTBEAT_INTERVAL)
 
     val transportHeartbeatPause = configuration.getString(
-      ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_PAUSE,
-      ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE)
+      AkkaOptions.TRANSPORT_HEARTBEAT_PAUSE)
 
-    val transportThreshold = configuration.getDouble(
-      ConfigConstants.AKKA_TRANSPORT_THRESHOLD,
-      ConfigConstants.DEFAULT_AKKA_TRANSPORT_THRESHOLD)
+    val transportThreshold = 
configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)
 
-    val watchHeartbeatInterval = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
+    val watchHeartbeatInterval = configuration.getString(
+      AkkaOptions.WATCH_HEARTBEAT_INTERVAL)
 
-    val watchHeartbeatPause = 
configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_PAUSE);
+    val watchHeartbeatPause = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE)
 
-    val watchThreshold = configuration.getDouble(
-      ConfigConstants.AKKA_WATCH_THRESHOLD,
-      ConfigConstants.DEFAULT_AKKA_WATCH_THRESHOLD)
+    val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD)
 
-    val akkaTCPTimeout = configuration.getString(AkkaOptions.AKKA_TCP_TIMEOUT);
+    val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT)
 
-    val akkaFramesize = configuration.getString(
-      ConfigConstants.AKKA_FRAMESIZE,
-      ConfigConstants.DEFAULT_AKKA_FRAMESIZE)
+    val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE)
 
-    val lifecycleEvents = configuration.getBoolean(
-      ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
-      ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
+    val lifecycleEvents = 
configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS)
 
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
-    val akkaEnableSSLConfig = 
configuration.getBoolean(ConfigConstants.AKKA_SSL_ENABLED,
-        ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+    val akkaEnableSSLConfig = 
configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
           SSLUtils.getSSLEnabled(configuration)
 
     val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"
@@ -588,14 +574,13 @@ object AkkaUtils {
   }
 
   def getTimeout(config: Configuration): FiniteDuration = {
-    val duration = Duration(config.getString(ConfigConstants.AKKA_ASK_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
+    val duration = Duration(config.getString(AkkaOptions.ASK_TIMEOUT))
 
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
   def getDefaultTimeout: Time = {
-    val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
+    val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue())
 
     Time.milliseconds(duration.toMillis)
   }
@@ -607,30 +592,24 @@ object AkkaUtils {
   }
 
   def getLookupTimeout(config: Configuration): FiniteDuration = {
-    val duration = Duration(config.getString(
-      ConfigConstants.AKKA_LOOKUP_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT))
+    val duration = Duration(config.getString(AkkaOptions.LOOKUP_TIMEOUT))
 
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
   def getDefaultLookupTimeout: FiniteDuration = {
-    val duration = Duration(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT)
+    val duration = Duration(AkkaOptions.LOOKUP_TIMEOUT.defaultValue())
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
   def getClientTimeout(config: Configuration): FiniteDuration = {
-    val duration = Duration(
-      config.getString(
-        ConfigConstants.AKKA_CLIENT_TIMEOUT,
-        ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT
-      ))
+    val duration = Duration(config.getString(AkkaOptions.CLIENT_TIMEOUT))
 
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
   def getDefaultClientTimeout: FiniteDuration = {
-    val duration = Duration(ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT)
+    val duration = Duration(AkkaOptions.CLIENT_TIMEOUT.defaultValue())
 
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 2ace8db..abc8946 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -265,9 +265,9 @@ abstract class FlinkMiniCluster(
     // 
https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables
     if (sys.env.contains("CI")) {
       // Only set if nothing specified in config
-      if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) {
-        val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10
-        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
s"${duration.toSeconds}s")
+      if (!config.contains(AkkaOptions.ASK_TIMEOUT)) {
+        val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) * 10
+        config.setString(AkkaOptions.ASK_TIMEOUT, s"${duration.toSeconds}s")
 
         LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s")
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 5aa31ff..f1bc43b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
@@ -192,7 +192,7 @@ public class ResourceManagerTest extends TestLogger {
 
                        // set a short timeout for lookups
                        Configuration shortTimeoutConfig = config.clone();
-                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
+                       
shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "1 s");
 
                        fakeJobManager = TestingUtils.createForwardingActor(
                                system,
@@ -234,7 +234,7 @@ public class ResourceManagerTest extends TestLogger {
 
                        // set a long timeout for lookups such that the test 
fails in case of timeouts
                        Configuration shortTimeoutConfig = config.clone();
-                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
+                       
shortTimeoutConfig.setString(AkkaOptions.LOOKUP_TIMEOUT, "99999 s");
 
                        fakeJobManager = TestingUtils.createForwardingActor(
                                system,

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f19ca4e..0346e48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -51,7 +52,7 @@ public class PartialConsumePipelinedResultTest {
                final Configuration config = new Configuration();
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUMBER_OF_TMS);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUMBER_OF_SLOTS_PER_TM);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+               config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 
NUMBER_OF_NETWORK_BUFFERS);
 
                flink = new TestingCluster(config, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index c8459e7..1a4396e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -601,7 +602,7 @@ public class JobManagerTest extends TestLogger {
                Deadline deadline = new FiniteDuration(100, 
TimeUnit.SECONDS).fromNow();
 
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
+               config.setString(AkkaOptions.ASK_TIMEOUT, "100ms");
 
                ActorRef jobManagerActor = JobManager.startJobManagerActors(
                        config,

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9dcfc70..7234fea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -77,9 +78,9 @@ public class TaskManagerComponentsStartupShutdownTest extends 
TestLogger {
                final int BUFFER_SIZE = 32 * 1024;
 
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"200 ms");
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "1 
s");
-               config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 1);
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 
ms");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s");
+               config.setInteger(AkkaOptions.WATCH_THRESHOLD, 1);
 
                ActorSystem actorSystem = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 0844aad..3953072 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.actor.InvalidActorNameException;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -90,10 +91,10 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
        @BeforeClass
        public static void startActorSystem() {
                config = new Configuration();
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"200 ms");
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 
s");
-               config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+               config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 
ms");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s");
+               config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2);
 
                actorSystem = AkkaUtils.createLocalActorSystem(config);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 42338cd..48eb392 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -86,10 +87,10 @@ public class ZooKeeperTestUtils {
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
fsStateHandlePath + "/recovery");
 
                // Akka failure detection and execution retries
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
-               config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 
ms");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
+               config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
+               config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
                config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
 
                return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 9f8e3e1..daf0f47 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
 
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
 import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils, 
ScalaTestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -119,7 +119,7 @@ class AkkaSslITCase(_system: ActorSystem)
         val config = new Configuration()
         config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+        config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
         config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
         config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"invalid.keystore")
@@ -141,7 +141,7 @@ class AkkaSslITCase(_system: ActorSystem)
         val config = new Configuration()
         config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-        config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "2 s")
+        config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
         config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 97a001d..6d7d87c 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -23,7 +23,7 @@ import java.net.{InetAddress, InetSocketAddress}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.util.NetUtils
 import org.junit.Assert._
@@ -122,7 +122,7 @@ class JobManagerConnectionTest {
 
   private def createConfigWithLowTimeout() : Configuration = {
     val config = new Configuration()
-    config.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT,
+    config.setString(AkkaOptions.LOOKUP_TIMEOUT,
                      Duration(timeout, TimeUnit.MILLISECONDS).toSeconds + " s")
     config
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index f3ab409..4fc4042 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobStatus, JobVertex}
@@ -60,7 +60,7 @@ class RecoveryITCase(_system: ActorSystem)
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers)
-    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 
heartbeatTimeout)
+    config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
     config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay")
     config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1)
     config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
heartbeatTimeout)

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index c8977f0..858bbbb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
HighAvailabilityOptions, TaskManagerOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -89,7 +89,7 @@ object TestingUtils {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
-    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+    config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
 
     val cluster = new TestingCluster(config)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 437dd5f..5f6f5c4 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -146,8 +147,8 @@ public class TestBaseUtils extends TestLogger {
                config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
TASK_MANAGER_MEMORY_SIZE);
                
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
-               config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
+               config.setString(AkkaOptions.ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
+               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
 
                config.setInteger(JobManagerOptions.WEB_PORT, 8081);
                config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
@@ -287,7 +288,7 @@ public class TestBaseUtils extends TestLogger {
                        String resultPath,
                        String[] excludePrefixes,
                        boolean inOrderOfFiles) throws IOException {
-               
+
                checkArgument(resultPath != null, "resultPath cannot be be 
null");
 
                final BufferedReader[] readers = getResultReader(resultPath, 
excludePrefixes, inOrderOfFiles);
@@ -328,8 +329,8 @@ public class TestBaseUtils extends TestLogger {
                        String msg = String.format(
                                        "Different elements in arrays: expected 
%d elements and received %d\n" +
                                        "files: %s\n expected: %s\n received: 
%s",
-                                       expected.length, result.length, 
-                                       
Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)), 
+                                       expected.length, result.length,
+                                       
Arrays.toString(getAllInvolvedFiles(resultPath, excludePrefixes)),
                                        Arrays.toString(expected), 
Arrays.toString(result));
                        fail(msg);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49ff744..92e5768 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
@@ -120,7 +121,7 @@ public class AccumulatorLiveITCase extends TestLogger {
                Configuration config = new Configuration();
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+               config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                TestingCluster testingCluster = new TestingCluster(config, 
false, true);
                testingCluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 06233d6..2767312 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -20,6 +20,7 @@
 package org.apache.flink.test.cancelling;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -88,7 +89,7 @@ public abstract class CancelingTestBase extends TestLogger {
                
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
4);
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+               config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
                config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index a573be6..bda1679 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -74,8 +75,8 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
                config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
-               config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, 
"60 s");
-               config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 
s");
+               config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
+               config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
                cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index e7bd522..4905d43 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -44,7 +44,7 @@ public class StreamingCustomInputSplitProgram {
        public static void main(String[] args) throws Exception {
                                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+               config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 7c6f73a..85961db 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
@@ -78,7 +79,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
        @Test(expected=FlinkException.class)
        public void testInvalidAkkaConfiguration() throws Throwable {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                cluster.getHostname(),
@@ -103,7 +104,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
        @Test
        public void testUserSpecificParallelism() throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
VALID_STARTUP_TIMEOUT);
+               config.setString(AkkaOptions.STARTUP_TIMEOUT, 
VALID_STARTUP_TIMEOUT);
 
                final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                cluster.getHostname(),

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index c7c07ce..5c65a7f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -127,11 +128,11 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
 
                        Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
-                       
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
+                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"6 s");
+                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
                        
jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-                       jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
+                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
                        jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
                        jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
 
@@ -409,7 +410,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
                                
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
                                
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-                               cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
+                               cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
                                
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
                                        ResourceID.generate(), 
TaskManager.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 6d53b9f..c8c8d2a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -156,9 +157,9 @@ public class ChaosMonkeyITCase extends TestLogger {
                                ZooKeeper.getConnectString(), 
FileStateBackendBasePath.toURI().toString());
 
                // Akka and restart timeouts
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
-               config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
-               config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 
ms");
+               config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
+               config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
 
                if (checkpointingIntervalMs >= killEvery.toMillis()) {
                        throw new IllegalArgumentException("Relax! You want to 
kill processes every " +

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 9d2806c..59d5a51 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -101,10 +101,10 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
 
                        Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "5 s");
-                       
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2000 s");
-                       
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 10);
-                       jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
"100 s");
+                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
+                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"2000 s");
+                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
+                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
                        jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
                        jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index bafdd9f..93d369a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -77,9 +78,9 @@ public class TaskManagerFailureRecoveryITCase extends 
TestLogger {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
                        config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                        
-                       
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
-                       
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
-                       config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 
20);
+                       config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, 
"500 ms");
+                       config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 
s");
+                       config.setInteger(AkkaOptions.WATCH_THRESHOLD, 20);
 
                        cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/302c6741/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 40a8f09..37e89e9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import akka.actor.Kill;
 import akka.actor.PoisonPill;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -151,7 +152,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                // we "effectively" disable the automatic RecoverAllJobs 
message and sent it manually to make
                // sure that all TMs have registered to the JM prior to 
issueing the RecoverAllJobs message
-               configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
AkkaUtils.INF_TIMEOUT().toString());
+               configuration.setString(AkkaOptions.ASK_TIMEOUT, 
AkkaUtils.INF_TIMEOUT().toString());
 
                Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
 

Reply via email to