[FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01ffe34c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01ffe34c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01ffe34c

Branch: refs/heads/flip-6
Commit: 01ffe34c682c5f22928fbb476896600d0177d84e
Parents: 7206455
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Authored: Tue Aug 9 14:48:12 2016 +0530
Committer: Ufuk Celebi <u...@apache.org>
Committed: Wed Aug 24 12:09:24 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  26 ++--
 docs/setup/jobmanager_high_availability.md      |  39 +++---
 .../org/apache/flink/client/cli/DefaultCLI.java |   2 +-
 .../flink/configuration/ConfigConstants.java    |  83 ++++++++++++-
 .../apache/flink/util/ConfigurationUtil.java    | 101 ++++++++++++++++
 .../flink/util/ConfigurationUtilTest.java       | 115 ++++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh     |  22 +++-
 flink-dist/src/main/resources/flink-conf.yaml   |   2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  14 +--
 .../flink/runtime/blob/FileSystemBlobStore.java |   7 +-
 .../StandaloneCheckpointIDCounter.java          |   4 +-
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCheckpointIDCounter.java           |   4 +-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   4 +-
 .../jobmanager/HighAvailabilityMode.java        |  86 +++++++++++++
 .../flink/runtime/jobmanager/RecoveryMode.java  |  72 -----------
 .../StandaloneSubmittedJobGraphStore.java       |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        |   2 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  36 ++----
 .../flink/runtime/util/ZooKeeperUtils.java      | 120 +++++++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  19 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |  24 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   8 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   8 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../JobManagerLeaderElectionTest.java           |   4 +-
 .../ZooKeeperLeaderElectionTest.java            |  28 ++---
 .../ZooKeeperLeaderRetrievalTest.java           |  50 +++++++-
 .../runtime/testutils/JobManagerProcess.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  22 ++--
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   2 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |   6 +-
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  10 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   2 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   4 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |   8 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   8 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 48 files changed, 702 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 6bc9655..e6a335b 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -134,7 +134,7 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
 
-- `recovery.zookeeper.storageDir`: Required for HA. Directory for storing 
JobManager metadata; this is persisted in the state backend and only a pointer 
to this state is stored in ZooKeeper. Exactly like the checkpoint directory it 
must be accessible from the JobManager and a local filesystem should only be 
used for local deployments.
+- `high-availability.zookeeper.storageDir`: Required for HA. Directory for 
storing JobManager metadata; this is persisted in the state backend and only a 
pointer to this state is stored in ZooKeeper. Exactly like the checkpoint 
directory it must be accessible from the JobManager and a local filesystem 
should only be used for local deployments. Previously this config was named as 
`recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) 
on the TaskManagers.
 
@@ -285,29 +285,29 @@ of the JobManager, because the same ActorSystem is used. 
Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for 
the cluster execution. Currently, Flink supports the 'standalone' mode where 
only a single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for the 
cluster execution. Currently, Flink supports the 'none' mode where only a 
single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.  Previously this config was 
named 'recovery.mode' and the default config was 'standalone'.
 
-- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used 
to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is 
selected
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which 
is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode 
is selected.  Previously this config was name as `recovery.zookeeper.quorum`.
 
-- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir 
under which the ZooKeeper recovery mode will create namespace directories.
+- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root 
dir under which the ZooKeeper recovery mode will create namespace directories.  
Previously this config was name as `recovery.zookeeper.path.root`.
 
-- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone 
mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under 
the root dir where the ZooKeeper recovery mode will create znodes. This allows 
to isolate multiple applications on the same ZooKeeper.
+- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in 
standalone mode, or the <yarn-application-id> under Yarn) Defines the 
subdirectory under the root dir where the ZooKeeper recovery mode will create 
znodes. This allows to isolate multiple applications on the same ZooKeeper.  
Previously this config was named as `recovery.zookeeper.path.namespace`.
 
-- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode 
of the leader latch which is used to elect the leader.
+- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines 
the znode of the leader latch which is used to elect the leader.  Previously 
this config was named as `recovery.zookeeper.path.latch`.
 
-- `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of 
the leader which contains the URL to the leader and the current leader session 
ID
+- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the 
znode of the leader which contains the URL to the leader and the current leader 
session ID  Previously this config was named as 
`recovery.zookeeper.path.leader`.
 
-- `recovery.zookeeper.storageDir`: Defines the directory in the state backend 
where the JobManager metadata will be stored (ZooKeeper only keeps pointers to 
it). Required for HA.
+- `high-availability.zookeeper.storageDir`: Defines the directory in the state 
backend where the JobManager metadata will be stored (ZooKeeper only keeps 
pointers to it). Required for HA.  Previously this config was named as 
`recovery.zookeeper.storageDir`.
 
-- `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the 
session timeout for the ZooKeeper session in ms.
+- `high-availability.zookeeper.client.session-timeout`: (Default '60000') 
Defines the session timeout for the ZooKeeper session in ms.  Previously this 
config was named as `recovery.zookeeper.client.session-timeout`
 
-- `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines 
the connection timeout for ZooKeeper in ms.
+- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') 
Defines the connection timeout for ZooKeeper in ms.  Previously this config was 
named as `recovery.zookeeper.client.connection-timeout`.
 
-- `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause 
between consecutive retries in ms.
+- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines 
the pause between consecutive retries in ms.  Previously this config was named 
as `recovery.zookeeper.client.retry-wait`.
 
-- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the 
number of connection retries before the client gives up.
+- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') 
Defines the number of connection retries before the client gives up.  
Previously this config was named as 
`recovery.zookeeper.client.max-retry-attempts`.
 
-- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before 
persisted jobs are recovered in case of a recovery situation.
+- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the 
delay before persisted jobs are recovered in case of a recovery situation.  
Previously this config was named as `recovery.job.delay`.
 
 ## Environment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md 
b/docs/setup/jobmanager_high_availability.md
index d4f329a..dd6782d 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -42,7 +42,7 @@ As an example, consider the following setup with three 
JobManager instances:
 
 ### Configuration
 
-To enable JobManager High Availability you have to set the **recovery mode** 
to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** 
with all JobManagers hosts and their web UI ports.
+To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
 
 Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
 
@@ -58,38 +58,39 @@ jobManagerAddress1:webUIPort1
 jobManagerAddressX:webUIPortX
   </pre>
 
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the **`recovery.jobmanager.port`** key. 
This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a 
combination of both (`50010,50011,50020-50025,50050-50075`).
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
 
 #### Config File (flink-conf.yaml)
 
 In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
 
-- **Recovery mode** (required): The *recovery mode* has to be set in 
`conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
 
-  <pre>recovery.mode: zookeeper</pre>
+  <pre>high-availability: zookeeper</pre>
+- **Previously this config was named 'recovery.mode' and the default config 
was 'standalone'.
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
 
-  <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
+  <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
 - **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster namespace nodes are placed.
 
-  <pre>recovery.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink
 
 - **ZooKeeper namespace** (recommended): The *namespace ZooKeeper node*, under 
which all required coordination data for a cluster is placed.
 
-  <pre>recovery.zookeeper.path.namespace: /default_ns # important: customize 
per cluster</pre>
+  <pre>high-availability.zookeeper.path.namespace: /default_ns # important: 
customize per cluster</pre>
 
-  **Important**: if you are running multiple Flink HA clusters, you have to 
manually configure separate namespaces for each cluster. By default, the Yarn 
cluster and the Yarn session automatically generate namespaces based on Yarn 
application id. A manual configuration overrides this behaviour in Yarn. 
Specifying a namespace with the -z CLI option, in turn, overrides manual 
configuration.
+  **Important**: if you are running multiple Flink HA clusters, you have to 
manually configure separate namespaces for each cluster. By default, the Yarn 
cluster and the Yarn session automatically generate namespaces based on Yarn 
application id. A manual configuration overrides this behaviour in Yarn. 
Specifying a namespace with the -z CLI option, in turn, overrides manual 
configuration. 
 
 - **State backend and storage directory** (required): JobManager meta data is 
persisted in the *state backend* and only a pointer to this state is stored in 
ZooKeeper. Currently, only the file system state backend is supported in HA 
mode.
 
     <pre>
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager 
failure.
 
@@ -100,13 +101,13 @@ After configuring the masters and the ZooKeeper quorum, 
you can use the provided
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per 
cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: 
customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
 2. **Configure masters** in `conf/masters`:
 
@@ -186,13 +187,13 @@ This means that the application can be restarted 10 times 
before YARN fails the
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per 
cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: 
customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 yarn.application-attempts: 10</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 5f83c3d..18fa323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -64,7 +64,7 @@ public class DefaultCLI implements 
CustomCommandLine<StandaloneClusterClient> {
 
                if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
                        String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-                       
config.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+                       
config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
                }
 
                StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 98a843d..5cc1161 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -626,52 +626,127 @@ public final class ConfigConstants {
 
        // --------------------------- Recovery 
-----------------------------------
 
-       /** Defines recovery mode used for the cluster execution ("standalone", 
"zookeeper") */
+       /** Defines recovery mode used for the cluster execution ("standalone", 
"zookeeper")
+        *  Use {@link #HIGH_AVAILABILITY} instead
+        * */
+       @Deprecated
        public static final String RECOVERY_MODE = "recovery.mode";
 
+       /** Defines recovery mode used for the cluster execution ("NONE", 
"ZOOKEEPER") */
+       public static final String HIGH_AVAILABILITY = "high-availability";
+
        /** Ports used by the job manager if not in standalone recovery mode */
+       @Deprecated
        public static final String RECOVERY_JOB_MANAGER_PORT = 
"recovery.jobmanager.port";
 
+       /** Ports used by the job manager if not in 'none' recovery mode */
+       public static final String HA_JOB_MANAGER_PORT =
+               "high-availability.jobmanager.port";
+
        /** The time before the JobManager recovers persisted jobs */
+       @Deprecated
        public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
 
+       /** The time before the JobManager recovers persisted jobs */
+       public static final String HA_JOB_DELAY = "high-availability.job.delay";
+
        // --------------------------- ZooKeeper 
----------------------------------
 
        /** ZooKeeper servers. */
+       @Deprecated
        public static final String ZOOKEEPER_QUORUM_KEY = 
"recovery.zookeeper.quorum";
 
+       /** ZooKeeper servers. */
+       public static final String HA_ZOOKEEPER_QUORUM_KEY =
+               "high-availability.zookeeper.quorum";
+
        /**
         * File system state backend base path for recoverable state handles. 
Recovery state is written
         * to this path and the file state handles are persisted for recovery.
         */
+       @Deprecated
        public static final String ZOOKEEPER_RECOVERY_PATH = 
"recovery.zookeeper.storageDir";
 
+       /**
+        * File system state backend base path for recoverable state handles. 
Recovery state is written
+        * to this path and the file state handles are persisted for recovery.
+        */
+       public static final String ZOOKEEPER_HA_PATH =
+               "high-availability.zookeeper.storageDir";
+
        /** ZooKeeper root path. */
+       @Deprecated
        public static final String ZOOKEEPER_DIR_KEY = 
"recovery.zookeeper.path.root";
 
+       /** ZooKeeper root path. */
+       public static final String HA_ZOOKEEPER_DIR_KEY =
+               "high-availability.zookeeper.path.root";
+
+       @Deprecated
        public static final String ZOOKEEPER_NAMESPACE_KEY = 
"recovery.zookeeper.path.namespace";
 
+       public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
+               "high-availability.zookeeper.path.namespace";
+
+       @Deprecated
        public static final String ZOOKEEPER_LATCH_PATH = 
"recovery.zookeeper.path.latch";
 
+       public static final String HA_ZOOKEEPER_LATCH_PATH =
+               "high-availability.zookeeper.path.latch";
+
+       @Deprecated
        public static final String ZOOKEEPER_LEADER_PATH = 
"recovery.zookeeper.path.leader";
 
+       public static final String HA_ZOOKEEPER_LEADER_PATH = 
"high-availability.zookeeper.path.leader";
+
        /** ZooKeeper root path (ZNode) for job graphs. */
+       @Deprecated
        public static final String ZOOKEEPER_JOBGRAPHS_PATH = 
"recovery.zookeeper.path.jobgraphs";
 
+       /** ZooKeeper root path (ZNode) for job graphs. */
+       public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
+               "high-availability.zookeeper.path.jobgraphs";
+
        /** ZooKeeper root path (ZNode) for completed checkpoints. */
+       @Deprecated
        public static final String ZOOKEEPER_CHECKPOINTS_PATH = 
"recovery.zookeeper.path.checkpoints";
 
+       /** ZooKeeper root path (ZNode) for completed checkpoints. */
+       public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
+               "high-availability.zookeeper.path.checkpoints";
+
        /** ZooKeeper root path (ZNode) for checkpoint counters. */
+       @Deprecated
        public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = 
"recovery.zookeeper.path.checkpoint-counter";
 
+       /** ZooKeeper root path (ZNode) for checkpoint counters. */
+       public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
+               "high-availability.zookeeper.path.checkpoint-counter";
+
+       @Deprecated
        public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"recovery.zookeeper.client.session-timeout";
 
+       public static final String HA_ZOOKEEPER_SESSION_TIMEOUT =
+               "high-availability.zookeeper.client.session-timeout";
+
+       @Deprecated
        public static final String ZOOKEEPER_CONNECTION_TIMEOUT = 
"recovery.zookeeper.client.connection-timeout";
 
+       public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT =
+               "high-availability.zookeeper.client.connection-timeout";
+
+       @Deprecated
        public static final String ZOOKEEPER_RETRY_WAIT = 
"recovery.zookeeper.client.retry-wait";
 
+       public static final String HA_ZOOKEEPER_RETRY_WAIT =
+               "high-availability.zookeeper.client.retry-wait";
+
+       @Deprecated
        public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
"recovery.zookeeper.client.max-retry-attempts";
 
+       public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
+               "high-availability.zookeeper.client.max-retry-attempts";
+
        // ---------------------------- Metrics 
-----------------------------------
 
        /**
@@ -1015,10 +1090,12 @@ public final class ConfigConstants {
 
        public static final String LOCAL_START_WEBSERVER = 
"local.start-webserver";
 
-       // --------------------------- Recovery 
---------------------------------
-
+       // --------------------------- Recovery 
---------------------------------
+       @Deprecated
        public static String DEFAULT_RECOVERY_MODE = "standalone";
 
+       public static String DEFAULT_HIGH_AVAILABILTY = "none";
+
        /**
         * Default port used by the job manager if not in standalone recovery 
mode. If <code>0</code>
         * the OS picks a random port port.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
new file mode 100644
index 0000000..44f098b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for accessing deprecated {@link Configuration} values.
+ */
+@Internal
+public class ConfigurationUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConfigurationUtil.class);
+
+       /**
+        * Returns the value associated with the given key as an Integer in the
+        * given Configuration.
+        *
+        * <p>The regular key has precedence over any deprecated keys. The
+        * precedence of deprecated keys depends on the argument order, first
+        * deprecated keys have higher precedence than later ones.
+        *
+        * @param config Configuration to access
+        * @param key Configuration key (highest precedence)
+        * @param defaultValue Default value (if no key is set)
+        * @param deprecatedKeys Optional deprecated keys in precedence order
+        * @return Integer value associated with first found key or the default 
value
+        */
+       public static int getIntegerWithDeprecatedKeys(
+                       Configuration config,
+                       String key,
+                       int defaultValue,
+                       String... deprecatedKeys) {
+
+               if (config.containsKey(key)) {
+                       return config.getInteger(key, defaultValue);
+               } else {
+                       // Check deprecated keys
+                       for (String deprecatedKey : deprecatedKeys) {
+                               if (config.containsKey(deprecatedKey)) {
+                                       LOG.warn("Configuration key '{}' has 
been deprecated. Please use '{}' instead.", deprecatedKey, key);
+                                       return config.getInteger(deprecatedKey, 
defaultValue);
+                               }
+                       }
+                       return defaultValue;
+               }
+       }
+
+       /**
+        * Returns the value associated with the given key as a String in the
+        * given Configuration.
+        *
+        * <p>The regular key has precedence over any deprecated keys. The
+        * precedence of deprecated keys depends on the argument order, first
+        * deprecated keys have higher precedence than later ones.
+        *
+        * @param config Configuration to access
+        * @param key Configuration key (highest precedence)
+        * @param defaultValue Default value (if no key is set)
+        * @param deprecatedKeys Optional deprecated keys in precedence order
+        * @return String associated with first found key or the default value
+        */
+       public static String getStringWithDeprecatedKeys(
+                       Configuration config,
+                       String key,
+                       String defaultValue,
+                       String... deprecatedKeys) {
+
+               if (config.containsKey(key)) {
+                       return config.getString(key, defaultValue);
+               } else {
+                       // Check deprecated keys
+                       for (String deprecatedKey : deprecatedKeys) {
+                               if (config.containsKey(deprecatedKey)) {
+                                       LOG.warn("Configuration key {} has been 
deprecated. Please use {} instead.", deprecatedKey, key);
+                                       return config.getString(deprecatedKey, 
defaultValue);
+                               }
+                       }
+                       return defaultValue;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
new file mode 100644
index 0000000..7ecbd3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConfigurationUtilTest {
+
+       /**
+        * Tests getInteger without any deprecated keys.
+        */
+       @Test
+       public void testGetIntegerNoDeprecatedKeys() throws Exception {
+               Configuration config = new Configuration();
+               String key = "asdasd";
+               int value = 1223239;
+               int defaultValue = 272770;
+
+               assertEquals(defaultValue, 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+               config.setInteger(key, value);
+               assertEquals(value, 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+       }
+
+       /**
+        * Tests getInteger with deprecated keys and checks precedence.
+        */
+       @Test
+       public void testGetIntegerWithDeprecatedKeys() throws Exception {
+               Configuration config = new Configuration();
+               String key = "asdasd";
+               int value = 1223239;
+               int defaultValue = 272770;
+
+               String[] deprecatedKey = new String[] { "deprecated-0", 
"deprecated-1" };
+               int[] deprecatedValue = new int[] { 99192, 7727 };
+
+               assertEquals(defaultValue, 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+               // Set 2nd deprecated key
+               config.setInteger(deprecatedKey[1], deprecatedValue[1]);
+               assertEquals(deprecatedValue[1], 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, 
deprecatedKey[1]));
+
+               // Set 1st deprecated key (precedence)
+               config.setInteger(deprecatedKey[0], deprecatedValue[0]);
+               assertEquals(deprecatedValue[0], 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, 
deprecatedKey[0]));
+
+               // Set current key
+               config.setInteger(key, value);
+               assertEquals(value, 
ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+       }
+
+       /**
+        * Tests getString without any deprecated keys.
+        */
+       @Test
+       public void testGetStringNoDeprecatedKeys() throws Exception {
+               Configuration config = new Configuration();
+               String key = "asdasd";
+               String value = "1223239";
+               String defaultValue = "272770";
+
+               assertEquals(defaultValue, 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+               config.setString(key, value);
+               assertEquals(value, 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+       }
+
+       /**
+        * Tests getString with deprecated keys and checks precedence.
+        */
+       @Test
+       public void testGetStringWithDeprecatedKeys() throws Exception {
+               Configuration config = new Configuration();
+               String key = "asdasd";
+               String value = "1223239";
+               String defaultValue = "272770";
+
+               String[] deprecatedKey = new String[] { "deprecated-0", 
"deprecated-1" };
+               String[] deprecatedValue = new String[] { "99192", "7727" };
+
+               assertEquals(defaultValue, 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+               // Set 2nd deprecated key
+               config.setString(deprecatedKey[1], deprecatedValue[1]);
+               assertEquals(deprecatedValue[1], 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, 
deprecatedKey[1]));
+
+               // Set 1st deprecated key (precedence)
+               config.setString(deprecatedKey[0], deprecatedValue[0]);
+               assertEquals(deprecatedValue[0], 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, 
deprecatedKey[0]));
+
+               // Set current key
+               config.setString(key, value);
+               assertEquals(value, 
ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 9ffa713..687a39c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -104,7 +104,9 @@ KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
 KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
+#deprecated
 KEY_RECOVERY_MODE="recovery.mode"
+KEY_HIGH_AVAILABILITY="high-availability"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 
########################################################################################################################
@@ -257,8 +259,26 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
+# for backward compatability
+if [ -z "${OLD_RECOVERY_MODE}" ]; then
+    OLD_RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" 
"${YAML_CONF}")
+fi
+
 if [ -z "${RECOVERY_MODE}" ]; then
-    RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" 
"${YAML_CONF}")
+     # Read the new config
+     RECOVERY_MODE=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+     if [ -z "${RECOVERY_MODE}" ]; then
+        #no new config found. So old config should be used
+        if [ -z "${OLD_RECOVERY_MODE}" ]; then
+            # If old config is also not found, use the 'none' as the default 
config
+            RECOVERY_MODE="none"
+        elif [ ${OLD_RECOVERY_MODE} = "standalone" ]; then
+            # if oldconfig is 'standalone', rename to 'none'
+            RECOVERY_MODE="none"
+        else
+            RECOVERY_MODE=${OLD_RECOVERY_MODE}
+        fi
+     fi
 fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml 
b/flink-dist/src/main/resources/flink-conf.yaml
index 8bd4747..a2586ce 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -138,7 +138,7 @@ jobmanager.web.port: 8081
 # setup. This must be a list of the form:
 # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
 #
-# recovery.mode: zookeeper
+# high-availability: zookeeper
 #
 # recovery.zookeeper.quorum: localhost:2181,...
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 677ff54..d9edafe 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -296,8 +296,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        final Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
                        
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
-                       config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
zooKeeper.getConnectString());
+                       config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"ZOOKEEPER");
+                       
config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zooKeeper.getConnectString());
 
                        actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b4b6812..d1b78a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +77,7 @@ public class BlobServer extends Thread implements BlobService 
{
 
        /**
         * Shutdown hook thread to ensure deletion of the storage directory (or 
<code>null</code> if
-        * the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+        * the configured recovery mode does not equal{@link 
HighAvailabilityMode#NONE})
         */
        private final Thread shutdownHook;
 
@@ -90,7 +90,7 @@ public class BlobServer extends Thread implements BlobService 
{
        public BlobServer(Configuration config) throws IOException {
                checkNotNull(config, "Configuration");
 
-               RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
+               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
 
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -98,14 +98,14 @@ public class BlobServer extends Thread implements 
BlobService {
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
                // No recovery.
-               if (recoveryMode == RecoveryMode.STANDALONE) {
+               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
                        this.blobStore = new VoidBlobStore();
                }
                // Recovery.
-               else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+               else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
                        this.blobStore = new FileSystemBlobStore(config);
                } else {
-                       throw new IllegalConfigurationException("Unexpected 
recovery mode '" + recoveryMode + ".");
+                       throw new IllegalConfigurationException("Unexpected 
recovery mode '" + highAvailabilityMode + ".");
                }
 
                // configure the maximum number of concurrent connections
@@ -128,7 +128,7 @@ public class BlobServer extends Thread implements 
BlobService {
                        backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
                }
 
-               if (recoveryMode == RecoveryMode.STANDALONE) {
+               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
                        // Add shutdown hook to delete storage directory
                        this.shutdownHook = BlobUtils.addShutdownHook(this, 
LOG);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 5641d87..f535c35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -51,12 +51,15 @@ class FileSystemBlobStore implements BlobStore {
        private final String basePath;
 
        FileSystemBlobStore(Configuration config) throws IOException {
-               String recoveryPath = 
config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, null);
+               String recoveryPath = 
config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+               if(recoveryPath == null) {
+                       recoveryPath = 
config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+               }
 
                if (recoveryPath == null) {
                        throw new 
IllegalConfigurationException(String.format("Missing configuration for " +
                                        "file system state backend recovery 
path. Please specify via " +
-                                       "'%s' key.", 
ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
+                                       "'%s' key.", 
ConfigConstants.ZOOKEEPER_HA_PATH));
                }
 
                this.basePath = recoveryPath + "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 0a235bc..c2f67f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
  *
  * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job 
managers are not
  * recoverable in this recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 05f9e77..aecb51e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ * {@link CheckpointCoordinator} components in {@link 
HighAvailabilityMode#NONE}.
  */
 public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index bc111cd..a35ca77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -28,7 +28,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
  */
 public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointStore {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 12839c1..0bceb8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,14 +23,14 @@ import 
org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each counter creates a ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index dcd6260..55a0bbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointCoordinator} components in {@link 
HighAvailabilityMode#ZOOKEEPER}.
  */
 public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 541629d..376ef70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,7 +24,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -40,7 +40,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link 
HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Checkpoints are added under a ZNode per job:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
new file mode 100644
index 0000000..8e2efa8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ *
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This 
JobManager
+ * is responsible for the job execution. Upon failure of the leader a new 
leader is elected
+ * which will take over the responsibilities of the old leader
+ */
+public enum HighAvailabilityMode {
+       // STANDALONE mode renamed to NONE
+       NONE,
+       ZOOKEEPER;
+
+       /**
+        * Return the configured {@link HighAvailabilityMode}.
+        *
+        * @param config The config to parse
+        * @return Configured recovery mode or {@link 
ConfigConstants#DEFAULT_HIGH_AVAILABILTY} if not
+        * configured.
+        */
+       public static HighAvailabilityMode fromConfig(Configuration config) {
+               // Not passing the default value here so that we could determine
+               // if there is an older config set
+               String recoveryMode = config.getString(
+                       ConfigConstants.HIGH_AVAILABILITY, "");
+               if (recoveryMode.isEmpty()) {
+                       // New config is not set.
+                       // check the older one
+                       // check for older 'recover.mode' config
+                       recoveryMode = config.getString(
+                               ConfigConstants.RECOVERY_MODE,
+                               ConfigConstants.DEFAULT_RECOVERY_MODE);
+                       if 
(recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
+                               // There is no HA configured.
+                               return 
HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+                       }
+               } else if 
(recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_HIGH_AVAILABILTY)) {
+                       // The new config is found but with default value. So 
use this
+                       return 
HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+               }
+               return HighAvailabilityMode.valueOf(recoveryMode.toUpperCase());
+       }
+
+       /**
+        * Returns true if the defined recovery mode supports high availability.
+        *
+        * @param configuration Configuration which contains the recovery mode
+        * @return true if high availability is supported by the recovery mode, 
otherwise false
+        */
+       public static boolean isHighAvailabilityModeActivated(Configuration 
configuration) {
+               HighAvailabilityMode mode = fromConfig(configuration);
+               switch (mode) {
+                       case NONE:
+                               return false;
+                       case ZOOKEEPER:
+                               return true;
+                       default:
+                               return false;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
deleted file mode 100644
index 077e34d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Recovery mode for Flink's cluster execution. Currently supported modes are:
- *
- * - Standalone: No recovery from JobManager failures
- * - ZooKeeper: JobManager high availability via ZooKeeper
- * ZooKeeper is used to select a leader among a group of JobManager. This 
JobManager
- * is responsible for the job execution. Upon failure of the leader a new 
leader is elected
- * which will take over the responsibilities of the old leader
- */
-public enum RecoveryMode {
-       STANDALONE,
-       ZOOKEEPER;
-
-       /**
-        * Return the configured {@link RecoveryMode}.
-        *
-        * @param config The config to parse
-        * @return Configured recovery mode or {@link 
ConfigConstants#DEFAULT_RECOVERY_MODE} if not
-        * configured.
-        */
-       public static RecoveryMode fromConfig(Configuration config) {
-               return RecoveryMode.valueOf(config.getString(
-                               ConfigConstants.RECOVERY_MODE,
-                               
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
-       }
-
-       /**
-        * Returns true if the defined recovery mode supports high availability.
-        *
-        * @param configuration Configuration which contains the recovery mode
-        * @return true if high availability is supported by the recovery mode, 
otherwise false
-        */
-       public static boolean isHighAvailabilityModeActivated(Configuration 
configuration) {
-               String recoveryMode = configuration.getString(
-                       ConfigConstants.RECOVERY_MODE,
-                       ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
-               RecoveryMode mode = RecoveryMode.valueOf(recoveryMode);
-
-               switch(mode) {
-                       case STANDALONE:
-                               return false;
-                       case ZOOKEEPER:
-                               return true;
-                       default:
-                               return false;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..3041cde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
  *
  * <p>All operations are NoOps, because {@link JobGraph} instances cannot be 
recovered in this
  * recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 128db83..7f7c5fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -44,7 +44,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each job graph creates ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 8f88daa..7a656cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,15 +61,15 @@ public class LeaderRetrievalUtils {
        public static LeaderRetrievalService 
createLeaderRetrievalService(Configuration configuration)
                throws Exception {
 
-               RecoveryMode recoveryMode = getRecoveryMode(configuration);
+               HighAvailabilityMode highAvailabilityMode = 
getRecoveryMode(configuration);
 
-               switch (recoveryMode) {
-                       case STANDALONE:
+               switch (highAvailabilityMode) {
+                       case NONE:
                                return 
StandaloneUtils.createLeaderRetrievalService(configuration);
                        case ZOOKEEPER:
                                return 
ZooKeeperUtils.createLeaderRetrievalService(configuration);
                        default:
-                               throw new Exception("Recovery mode " + 
recoveryMode + " is not supported.");
+                               throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
                }
        }
 
@@ -86,16 +86,16 @@ public class LeaderRetrievalUtils {
        public static LeaderRetrievalService createLeaderRetrievalService(
                                Configuration configuration, ActorRef 
standaloneRef) throws Exception {
 
-               RecoveryMode recoveryMode = getRecoveryMode(configuration);
+               HighAvailabilityMode highAvailabilityMode = 
getRecoveryMode(configuration);
 
-               switch (recoveryMode) {
-                       case STANDALONE:
+               switch (highAvailabilityMode) {
+                       case NONE:
                                String akkaUrl = 
standaloneRef.path().toSerializationFormat();
                                return new 
StandaloneLeaderRetrievalService(akkaUrl);
                        case ZOOKEEPER:
                                return 
ZooKeeperUtils.createLeaderRetrievalService(configuration);
                        default:
-                               throw new Exception("Recovery mode " + 
recoveryMode + " is not supported.");
+                               throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
                }
        }
        
@@ -282,7 +282,7 @@ public class LeaderRetrievalUtils {
        }
 
        /**
-        * Gets the recovery mode as configured, based on the {@link 
ConfigConstants#RECOVERY_MODE}
+        * Gets the recovery mode as configured, based on the {@link 
ConfigConstants#HIGH_AVAILABILITY}
         * config key.
         * 
         * @param config The configuration to read the recovery mode from.
@@ -291,20 +291,8 @@ public class LeaderRetrievalUtils {
         * @throws IllegalConfigurationException Thrown, if the recovery mode 
does not correspond
         *                                       to a known value.
         */
-       public static RecoveryMode getRecoveryMode(Configuration config) {
-               String mode = config.getString(
-                       ConfigConstants.RECOVERY_MODE,
-                       ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-               
-               switch (mode) {
-                       case "STANDALONE":
-                               return RecoveryMode.STANDALONE;
-                       case "ZOOKEEPER":
-                               return RecoveryMode.ZOOKEEPER;
-                       default:
-                               throw new IllegalConfigurationException(
-                                       "The value for '" + 
ConfigConstants.RECOVERY_MODE + "' is unknown: " + mode);
-               }
+       public static HighAvailabilityMode getRecoveryMode(Configuration 
config) {
+               return HighAvailabilityMode.fromConfig(config);
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3986fed..5fd6f8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
@@ -56,35 +56,44 @@ public class ZooKeeperUtils {
         * @return {@link CuratorFramework} instance
         */
        public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
-               String zkQuorum = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-
+               String zkQuorum = 
configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+               if (zkQuorum.isEmpty()) {
+                       zkQuorum = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+               }
                if (zkQuorum == null || zkQuorum.equals("")) {
                        throw new RuntimeException("No valid ZooKeeper quorum 
has been specified. " +
                                        "You can specify the quorum via the 
configuration key '" +
-                                       ConfigConstants.ZOOKEEPER_QUORUM_KEY + 
"'.");
+                                       ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY 
+ "'.");
                }
 
-               int sessionTimeout = configuration.getInteger(
-                               ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+               int sessionTimeout = getConfiguredIntValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
+                       ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-               int connectionTimeout = configuration.getInteger(
-                               ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+               int connectionTimeout = getConfiguredIntValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
+                       ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-               int retryWait = configuration.getInteger(
-                               ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+               int retryWait = getConfiguredIntValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
+                       ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
 
-               int maxRetryAttempts = configuration.getInteger(
-                               ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+               int maxRetryAttempts = getConfiguredIntValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+                       ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
 
-               String root = 
configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
+               String root = getConfiguredStringValue(configuration, 
ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
+                       ConfigConstants.ZOOKEEPER_DIR_KEY,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
 
-               String namespace = 
configuration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
+               String namespace = getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
+                       ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
 
                String rootWithNamespace = generateZookeeperPath(root, 
namespace);
 
@@ -105,11 +114,36 @@ public class ZooKeeperUtils {
                return cf;
        }
 
+       private static int getConfiguredIntValue(Configuration configuration, 
String newConfigName, String oldConfigName, int defaultValue) {
+               int val = configuration.getInteger(newConfigName, -1);
+               if (val == -1) {
+                       val = configuration.getInteger(
+                               oldConfigName, -1);
+               }
+               // if still the val is not set use the default value
+               if (val == -1) {
+                       return defaultValue;
+               }
+               return val;
+       }
+
+       private static String getConfiguredStringValue(Configuration 
configuration, String newConfigName, String oldConfigName, String defaultValue) 
{
+               String val = configuration.getString(newConfigName, "");
+               if (val.isEmpty()) {
+                       val = configuration.getString(
+                               oldConfigName, "");
+               }
+               // still no value found - use the default value
+               if (val.isEmpty()) {
+                       return defaultValue;
+               }
+               return val;
+       }
        /**
-        * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
+        * Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured.
         */
        public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
-               return 
RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
+               return 
HighAvailabilityMode.fromConfig(flinkConf).equals(HighAvailabilityMode.ZOOKEEPER);
        }
 
        /**
@@ -119,7 +153,10 @@ public class ZooKeeperUtils {
        public static String getZooKeeperEnsemble(Configuration flinkConf)
                        throws IllegalConfigurationException {
 
-               String zkQuorum = 
flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+               String zkQuorum = 
flinkConf.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+               if (zkQuorum.isEmpty()) {
+                       zkQuorum = 
flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+               }
 
                if (zkQuorum == null || zkQuorum.equals("")) {
                        throw new IllegalConfigurationException("No ZooKeeper 
quorum specified in config.");
@@ -141,8 +178,9 @@ public class ZooKeeperUtils {
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
                        Configuration configuration) throws Exception {
                CuratorFramework client = startCuratorFramework(configuration);
-               String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+               String leaderPath = getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, 
ConfigConstants.ZOOKEEPER_LEADER_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
                return new ZooKeeperLeaderRetrievalService(client, leaderPath);
        }
@@ -175,9 +213,11 @@ public class ZooKeeperUtils {
                        CuratorFramework client,
                        Configuration configuration) throws Exception {
 
-               String latchPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
-               String leaderPath = 
configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+               String latchPath = getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, 
ConfigConstants.ZOOKEEPER_LATCH_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
+               String leaderPath = getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, 
ConfigConstants.ZOOKEEPER_LEADER_PATH,
                        ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
                return new ZooKeeperLeaderElectionService(client, latchPath, 
leaderPath);
@@ -199,9 +239,9 @@ public class ZooKeeperUtils {
                StateStorageHelper<SubmittedJobGraph> stateStorage = 
createFileSystemStateStorage(configuration, "submittedJobGraph");
 
                // ZooKeeper submitted jobs root dir
-               String zooKeeperSubmittedJobsPath = configuration.getString(
-                               ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+               String zooKeeperSubmittedJobsPath = 
getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, 
ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
                return new ZooKeeperSubmittedJobGraphStore(
                                client, zooKeeperSubmittedJobsPath, 
stateStorage);
@@ -226,7 +266,8 @@ public class ZooKeeperUtils {
 
                checkNotNull(configuration, "Configuration");
 
-               String checkpointsPath = configuration.getString(
+               String checkpointsPath = getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
                        ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
                        ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 
@@ -257,9 +298,10 @@ public class ZooKeeperUtils {
                        Configuration configuration,
                        JobID jobId) throws Exception {
 
-               String checkpointIdCounterPath = configuration.getString(
-                               
ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+               String checkpointIdCounterPath = 
getConfiguredStringValue(configuration,
+                       ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+                       ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+                       
ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
                checkpointIdCounterPath += 
ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -280,11 +322,15 @@ public class ZooKeeperUtils {
                        String prefix) throws IOException {
 
                String rootPath = configuration.getString(
-                       ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+                       ConfigConstants.ZOOKEEPER_HA_PATH, "");
+               if (rootPath.isEmpty()) {
+                       rootPath = configuration.getString(
+                               ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+               }
 
                if (rootPath.equals("")) {
                        throw new IllegalConfigurationException("Missing 
recovery path. Specify via " +
-                               "configuration key '" + 
ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
+                               "configuration key '" + 
ConfigConstants.ZOOKEEPER_HA_PATH + "'.");
                } else {
                        return new FileSystemStateStorageHelper<T>(rootPath, 
prefix);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a82e89a..5962afc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -155,7 +155,7 @@ class JobManager(
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
-  protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+  protected val recoveryMode = 
HighAvailabilityMode.fromConfig(flinkConfiguration)
 
   var leaderSessionID: Option[UUID] = None
 
@@ -317,7 +317,7 @@ class JobManager(
 
         // TODO (critical next step) This needs to be more flexible and robust 
(e.g. wait for task
         // managers etc.)
-        if (recoveryMode != RecoveryMode.STANDALONE) {
+        if (recoveryMode != HighAvailabilityMode.NONE) {
           log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
           context.system.scheduler.scheduleOnce(
@@ -2026,7 +2026,7 @@ object JobManager {
 
     if (!listeningPortRange.hasNext) {
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
-        val message = "Config parameter '" + 
ConfigConstants.RECOVERY_JOB_MANAGER_PORT +
+        val message = "Config parameter '" + 
ConfigConstants.HA_JOB_MANAGER_PORT +
           "' does not specify a valid port range."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -2568,8 +2568,8 @@ object JobManager {
 
     // Create recovery related components
     val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) 
=
-      RecoveryMode.fromConfig(configuration) match {
-        case RecoveryMode.STANDALONE =>
+      HighAvailabilityMode.fromConfig(configuration) match {
+        case HighAvailabilityMode.NONE =>
           val leaderElectionService = leaderElectionServiceOption match {
             case Some(les) => les
             case None => new StandaloneLeaderElectionService()
@@ -2579,7 +2579,7 @@ object JobManager {
             new StandaloneSubmittedJobGraphStore(),
             new StandaloneCheckpointRecoveryFactory())
 
-        case RecoveryMode.ZOOKEEPER =>
+        case HighAvailabilityMode.ZOOKEEPER =>
           val client = ZooKeeperUtils.startCuratorFramework(configuration)
 
           val leaderElectionService = leaderElectionServiceOption match {
@@ -2594,7 +2594,10 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    val jobRecoveryTimeoutStr = 
configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    var jobRecoveryTimeoutStr = 
configuration.getString(ConfigConstants.HA_JOB_DELAY, "");
+    if (jobRecoveryTimeoutStr.isEmpty) {
+      jobRecoveryTimeoutStr = 
configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    }
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || 
jobRecoveryTimeoutStr.isEmpty) {
       timeout
@@ -2604,7 +2607,7 @@ object JobManager {
       } catch {
         case n: NumberFormatException =>
           throw new Exception(
-            s"Invalid config value for ${ConfigConstants.RECOVERY_JOB_DELAY}: 
" +
+            s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
               s"$jobRecoveryTimeoutStr. Value must be a valid duration (such 
as '10 s' or '1 min')")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index f6e9360..0778aae 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,22 +24,18 @@ import java.util.UUID
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-
 import com.typesafe.config.Config
-
-import org.apache.flink.api.common.{JobID, JobExecutionResult, 
JobSubmissionResult}
+import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.RecoveryMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, 
LeaderRetrievalListener,
-StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.ZooKeeperUtils
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -88,7 +84,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(configuration)
 
-  val recoveryMode = RecoveryMode.fromConfig(configuration)
+  val recoveryMode = HighAvailabilityMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 
@@ -126,7 +122,7 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -137,7 +133,7 @@ abstract class FlinkMiniCluster(
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -528,7 +524,7 @@ abstract class FlinkMiniCluster(
   protected def createLeaderRetrievalService(): LeaderRetrievalService = {
     (jobManagerActorSystems, jobManagerActors) match {
       case (Some(jmActorSystems), Some(jmActors)) =>
-        if (recoveryMode == RecoveryMode.STANDALONE) {
+        if (recoveryMode == HighAvailabilityMode.NONE) {
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 528a20c..bd4723f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
        }
 
        /**
-        * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+        * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
         * participating BlobServer.
         */
        @Test
@@ -68,9 +68,9 @@ public class BlobRecoveryITCase {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+                       config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
recoveryDir.getPath());
+                       config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
recoveryDir.getPath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f1021e6..a3fe0d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,7 +48,7 @@ public class BlobLibraryCacheRecoveryITCase {
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
        /**
-        * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+        * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
         * participating BlobLibraryCacheManager.
         */
        @Test
@@ -63,9 +63,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+                       config.setString(ConfigConstants.HIGH_AVAILABILITY, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
+                       config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..d980517 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -123,8 +123,8 @@ public class JobManagerHARecoveryTest {
                ActorRef jobManager = null;
                ActorRef taskManager = null;
 
-               flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
-               
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
temporaryFolder.newFolder().toString());
+               flinkConfiguration.setString(ConfigConstants.HIGH_AVAILABILITY, 
"zookeeper");
+               flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, 
temporaryFolder.newFolder().toString());
                
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bdd019d..5c696ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.InstanceManager;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -171,7 +171,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
 
        private Props createJobManagerProps(Configuration configuration) throws 
Exception {
                LeaderElectionService leaderElectionService;
-               if (RecoveryMode.fromConfig(configuration) == 
RecoveryMode.STANDALONE) {
+               if (HighAvailabilityMode.fromConfig(configuration) == 
HighAvailabilityMode.NONE) {
                        leaderElectionService = new 
StandaloneLeaderElectionService();
                }
                else {

Reply via email to