http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 1c0290a..048fbee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -89,8 +89,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
        @Test
        public void testZooKeeperLeaderElectionRetrieval() throws Exception {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +134,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testZooKeeperReelection() throws Exception {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
 
                Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
@@ -217,8 +217,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testZooKeeperReelectionWithReplacement() throws Exception {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
 
                int num = 3;
                int numTries = 30;
@@ -295,9 +295,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
                final String leaderPath = "/leader";
 
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
-               configuration.setString(ConfigConstants.ZOOKEEPER_LEADER_PATH, 
leaderPath);
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -379,8 +379,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testExceptionForwarding() throws Exception {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +448,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testEphemeralZooKeeperNodes() throws Exception {
                Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -466,7 +466,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
                        listener = new TestingListener();
 
                        client = 
ZooKeeperUtils.startCuratorFramework(configuration);
-                       final String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+                       final String leaderPath = 
configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
                                        
ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
                        cache = new NodeCache(client, leaderPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index aae1840..5aace34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -43,6 +44,7 @@ import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -82,8 +84,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
                long sleepingTime = 1000;
 
-               config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
 
                LeaderElectionService leaderElectionService = null;
                LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +181,8 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
        @Test
        public void testTimeoutOfFindConnectingAddress() throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
 
                FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
@@ -190,6 +192,46 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
                assertEquals(InetAddress.getLocalHost(), result);
        }
 
+       @Test
+       public void testConnectionToZookeeperOverridingOldConfig() throws 
Exception {
+               Configuration config = new Configuration();
+               // The new config will be taken into effect
+               config.setString(ConfigConstants.RECOVERY_MODE, "standalone");
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+
+               FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+
+               LeaderRetrievalService leaderRetrievalService =
+                       
LeaderRetrievalUtils.createLeaderRetrievalService(config);
+               InetAddress result = 
LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
+
+               assertEquals(InetAddress.getLocalHost(), result);
+       }
+
+       @Test
+       public void testConnectionToStandAloneLeaderOverridingOldConfig() 
throws Exception {
+               Configuration config = new Configuration();
+               // The new config will be taken into effect
+               config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, "none");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+
+               HighAvailabilityMode mode = 
HighAvailabilityMode.fromConfig(config);
+               assertTrue(mode == HighAvailabilityMode.NONE);
+       }
+
+       @Test
+       public void testConnectionToZookeeperUsingOldConfig() throws Exception {
+               Configuration config = new Configuration();
+               // The new config will be taken into effect
+               config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+
+               HighAvailabilityMode mode = 
HighAvailabilityMode.fromConfig(config);
+               assertTrue(mode == HighAvailabilityMode.ZOOKEEPER);
+       }
+
        class FindConnectingAddress implements Runnable {
 
                private final Configuration config;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index fac6162..66d523f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess {
                 * <code>--port PORT</code>.
                 *
                 * <p>Other arguments are parsed to a {@link Configuration} and 
passed to the
-                * JobManager, for instance: <code>--recovery.mode ZOOKEEPER 
--recovery.zookeeper.quorum
+                * JobManager, for instance: <code>--high-availability 
ZOOKEEPER --recovery.zookeeper.quorum
                 * "xyz:123:456"</code>.
                 */
                public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 97e7cca..417dc88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 
                /**
                 * All arguments are parsed to a {@link Configuration} and 
passed to the Taskmanager,
-                * for instance: <code>--recovery.mode ZOOKEEPER 
--recovery.zookeeper.quorum "xyz:123:456"</code>.
+                * for instance: <code>--high-availability ZOOKEEPER 
--recovery.zookeeper.quorum "xyz:123:456"</code>.
                 */
                public static void main(String[] args) throws Exception {
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 2796337..c94842f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -31,12 +31,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class ZooKeeperTestUtils {
 
        /**
-        * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+        * Creates a configuration to operate in {@link 
HighAvailabilityMode#ZOOKEEPER}.
         *
         * @param zooKeeperQuorum   ZooKeeper quorum to connect to
         * @param fsStateHandlePath Base path for file system state backend 
(for checkpoints and
         *                          recovery)
-        * @return A new configuration to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        * @return A new configuration to operate in {@link 
HighAvailabilityMode#ZOOKEEPER}.
         */
        public static Configuration createZooKeeperRecoveryModeConfig(
                        String zooKeeperQuorum, String fsStateHandlePath) {
@@ -45,13 +45,13 @@ public class ZooKeeperTestUtils {
        }
 
        /**
-        * Sets all necessary configuration keys to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        * Sets all necessary configuration keys to operate in {@link 
HighAvailabilityMode#ZOOKEEPER}.
         *
         * @param config            Configuration to use
         * @param zooKeeperQuorum   ZooKeeper quorum to connect to
         * @param fsStateHandlePath Base path for file system state backend 
(for checkpoints and
         *                          recovery)
-        * @return The modified configuration to operate in {@link 
RecoveryMode#ZOOKEEPER}.
+        * @return The modified configuration to operate in {@link 
HighAvailabilityMode#ZOOKEEPER}.
         */
        public static Configuration setZooKeeperRecoveryMode(
                        Configuration config,
@@ -66,8 +66,8 @@ public class ZooKeeperTestUtils {
                config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
                // ZooKeeper recovery mode
-               config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeperQuorum);
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"ZOOKEEPER");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zooKeeperQuorum);
 
                int connTimeout = 5000;
                if (System.getenv().containsKey("CI")) {
@@ -75,20 +75,20 @@ public class ZooKeeperTestUtils {
                        connTimeout = 30000;
                }
 
-               config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, 
connTimeout);
-               config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
+               
config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+               config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
 
                // File system state backend
                config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
fsStateHandlePath + "/checkpoints");
-               config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
fsStateHandlePath + "/recovery");
+               config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
fsStateHandlePath + "/recovery");
 
                // Akka failure detection and execution retries
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
                config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
                config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
-               config.setString(ConfigConstants.RECOVERY_JOB_DELAY, "10 s");
+               config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
 
                return config;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index 0d01f65..daed4a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -71,7 +71,7 @@ public class ZooKeeperUtilTest extends TestLogger {
        }
 
        private Configuration setQuorum(Configuration conf, String quorum) {
-               conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum);
+               conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
                return conf;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index 8fc80e0..467706f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -58,7 +58,7 @@ public class ZooKeeperTestEnvironment {
                                zooKeeperServer = new TestingServer(true);
                                zooKeeperCluster = null;
 
-                               
conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+                               
conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
                                                
zooKeeperServer.getConnectString());
                        }
                        else {
@@ -67,7 +67,7 @@ public class ZooKeeperTestEnvironment {
 
                                zooKeeperCluster.start();
 
-                               
conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+                               
conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
                                                
zooKeeperCluster.getConnectString());
                        }
 
@@ -127,7 +127,7 @@ public class ZooKeeperTestEnvironment {
         */
        public CuratorFramework createClient() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
getConnectString());
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
getConnectString());
                return ZooKeeperUtils.startCuratorFramework(config);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 02a0fec..7d2b86c 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -423,7 +423,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, 
ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
@@ -502,7 +503,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, 
ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
     val actor = FlinkResourceManager.startResourceManagerActors(
       configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index c51e666..7e5acee 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger {
 
                if (startZooKeeper) {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+                       config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
                }
 
                return startCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 42c0a6a..5dd4188 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.{ConfigConstants, 
Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
@@ -261,14 +261,16 @@ class ForkableFlinkMiniCluster(
   }
 
   override def start(): Unit = {
-    val zookeeperURL = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+    val zookeeperURL = 
configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
 
-    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && 
zookeeperURL.equals("")) {
+    zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER &&
+      zookeeperURL.equals("")) {
       LOG.info("Starting ZooKeeper cluster.")
 
       val testingCluster = new TestingCluster(1)
 
-      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingCluster.getConnectString)
+      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+        testingCluster.getConnectString)
 
       testingCluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index e97532c..22bf62a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger {
                        fail(fsCheckpoints + " does not exist: " + 
Arrays.toString(FileStateBackendBasePath.listFiles()));
                }
 
-               File fsRecovery = new File(new 
URI(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "")).getPath());
+               File fsRecovery = new File(new 
URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath());
 
                LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index eccf971..e0e165d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -482,7 +482,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
 
                // ZooKeeper
                String currentJobsPath = config.getString(
-                               ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+                               ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
                                
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
                Stat stat = 
ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
@@ -514,7 +514,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
 
                // ZooKeeper
                String currentJobsPath = config.getString(
-                       ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+                       ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
                        ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
                Stat stat = 
ZooKeeper.getClient().checkExists().forPath(currentJobsPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 88aeb09..0c52204 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -149,8 +149,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
         */
        public void testJobManagerFailure(String zkQuorum, final File 
coordinateDir) throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zkQuorum);
+               config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"ZOOKEEPER");
+               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zkQuorum);
 
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                "leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 45ee839..7091339 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                int numJMs = 10;
                int numTMs = 3;
 
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-               
configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
+               configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
 
                ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
 
@@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                Configuration configuration = new Configuration();
 
-               configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+               configuration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-               
configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
+               configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
 
                // we "effectively" disable the automatic RecoverAllJobs 
message and sent it manually to make
                // sure that all TMs have registered to the JM prior to 
issueing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 60ae2ef..48ad7f5 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -202,7 +202,7 @@ public class CliFrontendYarnAddressConfigurationTest {
                                CliFrontendParser.parseRunCommand(new String[] 
{"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
                frontend.retrieveClient(options);
-               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
"error");
+               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
 "error");
                Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
        }
 
@@ -216,7 +216,7 @@ public class CliFrontendYarnAddressConfigurationTest {
                                CliFrontendParser.parseRunCommand(new String[] 
{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
 
                frontend.retrieveClient(options);
-               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
"error");
+               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
 "error");
                Assert.assertEquals(overrideZkNamespace, zkNs);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9b52975..25dbe53 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                        zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
                        "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
                        "@@" + 
FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + 
fsStateHandlePath + "/checkpoints" +
-                       "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + 
fsStateHandlePath + "/recovery");
+                       "@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + 
fsStateHandlePath + "/recovery");
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
 
                ClusterClient yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ba07af1..f4c2032 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -553,13 +553,13 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // no user specified cli argument for namespace?
                if (zkNamespace == null || zkNamespace.isEmpty()) {
                        // namespace defined in config? else use applicationId 
as default.
-                       zkNamespace = 
flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
String.valueOf(appId));
+                       zkNamespace = 
flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
String.valueOf(appId));
                        setZookeeperNamespace(zkNamespace);
                }
 
-               
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
+               
flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
 
-               if 
(RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+               if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
                        // activate re-execution of failed applications
                        appContext.setMaxAppAttempts(
                                flinkConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 39b2510..87a2c98 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -430,7 +430,7 @@ public class YarnApplicationMasterRunner {
                // override zookeeper namespace with user cli argument (if 
provided)
                String cliZKNamespace = 
ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
                if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-                       
configuration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, 
cliZKNamespace);
+                       
configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
cliZKNamespace);
                }
 
                // if a web monitor shall be started, set the port to random 
binding

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..3c93e34 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -58,7 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static 
org.apache.flink.configuration.ConfigConstants.ZOOKEEPER_NAMESPACE_KEY;
+import static 
org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -503,8 +503,8 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                if(null != applicationID) {
                        String zkNamespace = 
cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
                                        
cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-                                       : 
config.getString(ZOOKEEPER_NAMESPACE_KEY, applicationID);
-                       config.setString(ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+                                       : 
config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
+                       config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
 
                        AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
                        yarnDescriptor.setFlinkConfiguration(config);

Reply via email to