[FLINK-4768] [core] Migrate high-availability configuration parameters to 
ConfigOptions

This closes #2607


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8dc074a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8dc074a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8dc074a

Branch: refs/heads/flip-6
Commit: c8dc074a1899fa0f7d6ce7c6377c5e3d30159c18
Parents: d71a09c
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 8 01:41:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/cli/DefaultCLI.java |   5 +-
 .../configuration/HighAvailabilityOptions.java  | 139 +++++++++++++++++++
 .../webmonitor/WebRuntimeMonitorITCase.java     |   7 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  22 ++-
 .../jobmanager/HighAvailabilityMode.java        |   8 +-
 .../flink/runtime/security/SecurityContext.java |  11 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  68 +++------
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     |  46 +++---
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   5 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   5 +-
 .../jobmanager/HighAvailabilityModeTest.java    |  13 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../ZooKeeperLeaderElectionTest.java            |  25 ++--
 .../ZooKeeperLeaderRetrievalTest.java           |  15 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  13 +-
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   3 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |  13 +-
 .../connectors/fs/RollingSinkSecuredITCase.java |   5 +-
 .../flink/test/util/SecureTestEnvironment.java  |   3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   3 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   3 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   5 +-
 ...CliFrontendYarnAddressConfigurationTest.java |  11 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   5 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 29 files changed, 302 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 18fa323..8f79403 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -19,11 +19,12 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
+
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 
 import java.net.InetSocketAddress;
 
@@ -64,7 +65,7 @@ public class DefaultCLI implements 
CustomCommandLine<StandaloneClusterClient> {
 
                if 
(commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
                        String zkNamespace = 
commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-                       
config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+                       
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID.key(), zkNamespace);
                }
 
                StandaloneClusterDescriptor descriptor = new 
StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
new file mode 100644
index 0000000..1ee988a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to high-availability settings.
+ */
+@PublicEvolving
+public class HighAvailabilityOptions {
+
+       // 
------------------------------------------------------------------------
+       //  Required High Availability Options
+       // 
------------------------------------------------------------------------
+
+       /** 
+        * Defines high-availability mode used for the cluster execution.
+        * A value of "NONE" signals no highly available setup.
+        * To enable high-availability, set this mode to "ZOOKEEPER".
+        */
+       public static final ConfigOption<String> HA_MODE = 
+                       key("high-availability")
+                       .defaultValue("NONE")
+                       .withDeprecatedKeys("recovery.mode");
+
+       /**
+        * The ID of the Flink cluster, used to separate multiple Flink 
clusters 
+        * Needs to be set for standalone clusters, is automatically inferred 
in YARN and Mesos.
+        */
+       public static final ConfigOption<String> HA_CLUSTER_ID = 
+                       key("high-availability.cluster-id")
+                       .defaultValue("/default")
+                       
.withDeprecatedKeys("high-availability.zookeeper.path.namespace", 
"recovery.zookeeper.path.namespace");
+
+       /**
+        * File system path (URI) where Flink persists metadata in 
high-availability setups
+        */
+       public static final ConfigOption<String> HA_STORAGE_PATH =
+                       key("high-availability.storageDir")
+                       .noDefaultValue()
+                       
.withDeprecatedKeys("high-availability.zookeeper.storageDir", 
"recovery.zookeeper.storageDir");
+
+       /**
+        * The ZooKeeper quorum to use, when running Flink in a 
high-availability mode with ZooKeeper.
+        */
+       public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
+                       key("high-availability.zookeeper.quorum")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("recovery.zookeeper.quorum");
+       
+
+       // 
------------------------------------------------------------------------
+       //  Recovery Options
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Optional port (range) used by the job manager in high-availability 
mode.
+        */
+       public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE = 
+                       key("high-availability.jobmanager.port")
+                       .defaultValue("0")
+                       .withDeprecatedKeys("recovery.jobmanager.port");
+
+       /**
+        * The time before a JobManager after a fail over recovers the current 
jobs.
+        */
+       public static final ConfigOption<String> HA_JOB_DELAY = 
+                       key("high-availability.job.delay")
+                       .noDefaultValue()
+                       .withDeprecatedKeys("recovery.job.delay");
+
+       // 
------------------------------------------------------------------------
+       //  ZooKeeper Options
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The root path under which Flink stores its entries in ZooKeeper
+        */
+       public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
+                       key("high-availability.zookeeper.path.root")
+                       .defaultValue("/flink")
+                       .withDeprecatedKeys("recovery.zookeeper.path.root");
+
+       // 
------------------------------------------------------------------------
+       //  ZooKeeper Client Settings
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT = 
+                       
key("high-availability.zookeeper.client.session-timeout")
+                       .defaultValue(60000)
+                       
.withDeprecatedKeys("recovery.zookeeper.client.session-timeout");
+
+       public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
+                       
key("high-availability.zookeeper.client.connection-timeout")
+                       .defaultValue(15000)
+                       
.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout");
+
+       public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT = 
+                       key("high-availability.zookeeper.client.retry-wait")
+                       .defaultValue(5000)
+                       
.withDeprecatedKeys("recovery.zookeeper.client.retry-wait");
+
+       public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS 
= 
+                       
key("high-availability.zookeeper.client.max-retry-attempts")
+                       .defaultValue(3)
+                       
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts");
+
+       public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = 
+                       key("zookeeper.sasl.disable")
+                       .defaultValue(true);
+
+       public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = 
+                       key("zookeeper.sasl.service-name")
+                       .noDefaultValue();
+
+       // 
------------------------------------------------------------------------
+
+       /** Not intended to be instantiated */
+       private HighAvailabilityOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 54c5e76..1ae776c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
@@ -237,7 +238,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                followingClient.sendGetRequest("index.html", 
deadline.timeLeft());
                                response = 
followingClient.getNextResponse(deadline.timeLeft());
                                
assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus());
-                               assertTrue(response.getLocation().contains("" + 
leadingWebMonitor.getServerPort()));
+                               
assertTrue(response.getLocation().contains(String.valueOf(leadingWebMonitor.getServerPort())));
 
                                // Kill the leader
                                leadingSystem.shutdown();
@@ -296,8 +297,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        final Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
                        
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
-                       config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-                       
config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zooKeeper.getConnectString());
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeper.getConnectString());
 
                        actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index ee189d4..deba738 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -19,14 +19,17 @@
 package org.apache.flink.runtime.blob;
 
 import com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.IOUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,16 +55,11 @@ class FileSystemBlobStore implements BlobStore {
        private final String basePath;
 
        FileSystemBlobStore(Configuration config) throws IOException {
-               String storagePath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
-                               config,
-                               ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
-                               null,
-                               ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
-
-               if (storagePath == null) {
-                       throw new 
IllegalConfigurationException(String.format("Missing configuration for " +
-                                       "ZooKeeper file system path. Please 
specify via " +
-                                       "'%s' key.", 
ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH));
+               String storagePath = 
config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+
+               if (storagePath == null || StringUtils.isBlank(storagePath)) {
+                       throw new IllegalConfigurationException("Missing 
high-availability storage path for metadata." +
+                                       " Specify via configuration key '" + 
HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
                }
 
                this.basePath = storagePath + "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 087ad3b..fa2db48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.ConfigurationUtil;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 
 /**
  * High availability mode for Flink's cluster execution. Currently supported 
modes are:
@@ -43,11 +43,7 @@ public enum HighAvailabilityMode {
         * configured.
         */
        public static HighAvailabilityMode fromConfig(Configuration config) {
-               String haMode = ConfigurationUtil.getStringWithDeprecatedKeys(
-                               config,
-                               ConfigConstants.HA_MODE,
-                               null,
-                               ConfigConstants.RECOVERY_MODE);
+               String haMode = 
config.getValue(HighAvailabilityOptions.HA_MODE);
 
                if (haMode == null) {
                        return HighAvailabilityMode.NONE;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index be6611f..67dd78c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -182,9 +183,9 @@ public class SecurityContext {
                //with pseudo JAAS configuration file if SASL auth is enabled 
for ZK
                System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
 
-               boolean disableSaslClient = 
configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
-               if(disableSaslClient) {
+               boolean disableSaslClient = 
configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
+
+               if (disableSaslClient) {
                        LOG.info("SASL client auth for ZK will be disabled");
                        //SASL auth is disabled by default but will be enabled 
if specified in configuration
                        System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
@@ -212,8 +213,8 @@ public class SecurityContext {
                System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
                System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
 
-               String zkSaslServiceName = 
configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null);
-               if(!StringUtils.isBlank(zkSaslServiceName)) {
+               String zkSaslServiceName = 
configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
+               if (!StringUtils.isBlank(zkSaslServiceName)) {
                        LOG.info("ZK SASL service name: {} is provided in the 
configuration", zkSaslServiceName);
                        
System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5e69875..137a85b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -57,53 +59,25 @@ public class ZooKeeperUtils {
         * @return {@link CuratorFramework} instance
         */
        public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
-               String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-                               null,
-                               ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+               String zkQuorum = 
configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
-               if (zkQuorum == null || zkQuorum.equals("")) {
+               if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
                        throw new RuntimeException("No valid ZooKeeper quorum 
has been specified. " +
                                        "You can specify the quorum via the 
configuration key '" +
-                                       ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY 
+ "'.");
+                                       
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "'.");
                }
 
-               int sessionTimeout = 
ConfigurationUtil.getIntegerWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT,
-                               ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT);
+               int sessionTimeout = 
configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT);
 
-               int connectionTimeout = 
ConfigurationUtil.getIntegerWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT,
-                               ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT);
+               int connectionTimeout = 
configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT);
 
-               int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT,
-                               ConfigConstants.ZOOKEEPER_RETRY_WAIT);
+               int retryWait = 
configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_RETRY_WAIT);
 
-               int maxRetryAttempts = 
ConfigurationUtil.getIntegerWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-                               
ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-                               ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+               int maxRetryAttempts = 
configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
 
-               String root = ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY,
-                               ConfigConstants.ZOOKEEPER_DIR_KEY);
+               String root = 
configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
 
-               String namespace = 
ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY,
-                               ConfigConstants.ZOOKEEPER_NAMESPACE_KEY);
+               String namespace = 
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 
                String rootWithNamespace = generateZookeeperPath(root, 
namespace);
 
@@ -138,13 +112,9 @@ public class ZooKeeperUtils {
        public static String getZooKeeperEnsemble(Configuration flinkConf)
                        throws IllegalConfigurationException {
 
-               String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
-                               flinkConf,
-                               ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-                               "",
-                               ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+               String zkQuorum = 
flinkConf.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
-               if (zkQuorum == null || zkQuorum.equals("")) {
+               if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
                        throw new IllegalConfigurationException("No ZooKeeper 
quorum specified in config.");
                }
 
@@ -367,15 +337,11 @@ public class ZooKeeperUtils {
                        Configuration configuration,
                        String prefix) throws IOException {
 
-               String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
-                               "",
-                               ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
+               String rootPath = 
configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
 
-               if (rootPath.equals("")) {
-                       throw new IllegalConfigurationException("Missing 
recovery path. Specify via " +
-                               "configuration key '" + 
ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'.");
+               if (rootPath == null || StringUtils.isBlank(rootPath)) {
+                       throw new IllegalConfigurationException("Missing 
high-availability storage path for metadata." +
+                                       " Specify via configuration key '" + 
HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
                } else {
                        return new FileSystemStateStorageHelper<T>(rootPath, 
prefix);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index 9fba529..c4140c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.zookeeper;
 
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
@@ -47,8 +47,25 @@ import java.util.UUID;
  */
 public class FlinkZooKeeperQuorumPeer {
 
+       /** ZooKeeper default client port. */
+       public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+
+       /** ZooKeeper default init limit. */
+       public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+
+       /** ZooKeeper default sync limit. */
+       public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+
+       /** ZooKeeper default peer port. */
+       public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
+
+       /** ZooKeeper default leader port. */
+       public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
 
+       // 
------------------------------------------------------------------------
+
        public static void main(String[] args) {
                try {
                        // startup checks and logging
@@ -67,6 +84,8 @@ public class FlinkZooKeeperQuorumPeer {
                }
        }
 
+       // 
------------------------------------------------------------------------
+
        /**
         * Runs a ZooKeeper {@link QuorumPeer} if further peers are configured 
or a single
         * {@link ZooKeeperServer} if no further peers are configured.
@@ -120,26 +139,23 @@ public class FlinkZooKeeperQuorumPeer {
        private static void setRequiredProperties(Properties zkProps) {
                // Set default client port
                if (zkProps.getProperty("clientPort") == null) {
-                       int clientPort = 
ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
-                       zkProps.setProperty("clientPort", 
String.valueOf(clientPort));
+                       zkProps.setProperty("clientPort", 
String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_PORT));
 
-                       LOG.warn("No 'clientPort' configured. Set to '{}'.", 
clientPort);
+                       LOG.warn("No 'clientPort' configured. Set to '{}'.", 
DEFAULT_ZOOKEEPER_CLIENT_PORT);
                }
 
                // Set default init limit
                if (zkProps.getProperty("initLimit") == null) {
-                       int initLimit = 
ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT;
-                       zkProps.setProperty("initLimit", 
String.valueOf(initLimit));
+                       zkProps.setProperty("initLimit", 
String.valueOf(DEFAULT_ZOOKEEPER_INIT_LIMIT));
 
-                       LOG.warn("No 'initLimit' configured. Set to '{}'.", 
initLimit);
+                       LOG.warn("No 'initLimit' configured. Set to '{}'.", 
DEFAULT_ZOOKEEPER_INIT_LIMIT);
                }
 
                // Set default sync limit
                if (zkProps.getProperty("syncLimit") == null) {
-                       int syncLimit = 
ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT;
-                       zkProps.setProperty("syncLimit", 
String.valueOf(syncLimit));
+                       zkProps.setProperty("syncLimit", 
String.valueOf(DEFAULT_ZOOKEEPER_SYNC_LIMIT));
 
-                       LOG.warn("No 'syncLimit' configured. Set to '{}'.", 
syncLimit);
+                       LOG.warn("No 'syncLimit' configured. Set to '{}'.", 
DEFAULT_ZOOKEEPER_SYNC_LIMIT);
                }
 
                // Set default data dir
@@ -152,8 +168,8 @@ public class FlinkZooKeeperQuorumPeer {
                        LOG.warn("No 'dataDir' configured. Set to '{}'.", 
dataDir);
                }
 
-               int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT;
-               int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT;
+               int peerPort = DEFAULT_ZOOKEEPER_PEER_PORT;
+               int leaderPort = DEFAULT_ZOOKEEPER_LEADER_PORT;
 
                // Set peer and leader ports if none given, because ZooKeeper 
complains if multiple
                // servers are configured, but no ports are given.
@@ -220,12 +236,8 @@ public class FlinkZooKeeperQuorumPeer {
                
                // Write myid to file. We use a File Writer, because that 
properly propagates errors,
                // while the PrintWriter swallows errors
-               FileWriter writer = new FileWriter(new File(dataDir, "myid"));
-               try {
+               try (FileWriter writer = new FileWriter(new File(dataDir, 
"myid"))) {
                        writer.write(String.valueOf(id));
                }
-               finally {
-                       writer.close();
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e90f2d2..be820ae 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,7 +31,7 @@ import akka.pattern.ask
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration, HighAvailabilityOptions}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.{Gauge, MetricGroup}
@@ -2367,9 +2367,7 @@ object JobManager {
         configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
         // The port range of allowed job manager ports or 0 for random
-        configuration.getString(
-          ConfigConstants.RECOVERY_JOB_MANAGER_PORT,
-          ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT)
+        
configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
       }
       else {
         LOG.info("Starting JobManager without high-availability")
@@ -2501,11 +2499,7 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys(
-      configuration,
-      ConfigConstants.HA_JOB_DELAY,
-      null,
-      ConfigConstants.RECOVERY_JOB_DELAY)
+    val jobRecoveryTimeoutStr = 
configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || 
jobRecoveryTimeoutStr.isEmpty) {
       timeout
@@ -2515,7 +2509,7 @@ object JobManager {
       } catch {
         case n: NumberFormatException =>
           throw new Exception(
-            s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
+            s"Invalid config value for 
${HighAvailabilityOptions.HA_JOB_DELAY.key()}: " +
               s"$jobRecoveryTimeoutStr. Value must be a valid duration (such 
as '10 s' or '1 min')")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 8464d68..8ba20c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
@@ -68,9 +69,9 @@ public class BlobRecoveryITCase {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
recoveryDir.getPath());
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
recoveryDir.getPath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f6bed56..f6cdf09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -63,9 +64,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
                try {
                        Configuration config = new Configuration();
-                       config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
-                       
config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 04c0e48..91fb514 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -20,7 +20,8 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class HighAvailabilityModeTest {
                assertEquals(DEFAULT_HA_MODE, 
HighAvailabilityMode.fromConfig(config));
 
                // Check not equals default
-               config.setString(ConfigConstants.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+               config.setString(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
                assertEquals(HighAvailabilityMode.ZOOKEEPER, 
HighAvailabilityMode.fromConfig(config));
        }
 
@@ -54,16 +55,16 @@ public class HighAvailabilityModeTest {
                Configuration config = new Configuration();
 
                // Check mapping of old default to new default
-               config.setString(ConfigConstants.RECOVERY_MODE, 
ConfigConstants.DEFAULT_RECOVERY_MODE);
+               config.setString("recovery.mode", 
ConfigConstants.DEFAULT_RECOVERY_MODE);
                assertEquals(DEFAULT_HA_MODE, 
HighAvailabilityMode.fromConfig(config));
 
                // Check deprecated config
-               config.setString(ConfigConstants.RECOVERY_MODE, 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+               config.setString("recovery.mode", 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
                assertEquals(HighAvailabilityMode.ZOOKEEPER, 
HighAvailabilityMode.fromConfig(config));
 
                // Check precedence over deprecated config
-               config.setString(ConfigConstants.HA_MODE, 
HighAvailabilityMode.NONE.name().toLowerCase());
-               config.setString(ConfigConstants.RECOVERY_MODE, 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+               config.setString("high-availability", 
HighAvailabilityMode.NONE.name().toLowerCase());
+               config.setString("recovery.mode", 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
 
                assertEquals(HighAvailabilityMode.NONE, 
HighAvailabilityMode.fromConfig(config));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 360588d..5b12eee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -131,8 +132,8 @@ public class JobManagerHARecoveryTest {
                ActorRef jobManager = null;
                ActorRef taskManager = null;
 
-               flinkConfiguration.setString(ConfigConstants.HA_MODE, 
"zookeeper");
-               
flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+               flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+               
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
                
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index e20985b..1f1eb62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -27,6 +27,7 @@ import 
org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
@@ -89,8 +90,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
        @Test
        public void testZooKeeperLeaderElectionRetrieval() throws Exception {
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +135,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testZooKeeperReelection() throws Exception {
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
@@ -217,8 +218,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testZooKeeperReelectionWithReplacement() throws Exception {
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                int num = 3;
                int numTries = 30;
@@ -295,8 +296,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
                final String leaderPath = "/leader";
 
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
                
configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -379,8 +380,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testExceptionForwarding() throws Exception {
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService = null;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +449,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
        @Test
        public void testEphemeralZooKeeperNodes() throws Exception {
                Configuration configuration = new Configuration();
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                ZooKeeperLeaderElectionService leaderElectionService;
                ZooKeeperLeaderRetrievalService leaderRetrievalService = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0fe0644..70b1da0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -82,8 +85,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
                long sleepingTime = 1000;
 
-               config.setString(ConfigConstants.HA_MODE, "zookeeper");
-               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
 
                LeaderElectionService leaderElectionService = null;
                LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +182,8 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
        @Test
        public void testTimeoutOfFindConnectingAddress() throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.HA_MODE, "zookeeper");
-               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
 
                FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
@@ -190,7 +193,7 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
                assertEquals(InetAddress.getLocalHost(), result);
        }
 
-       class FindConnectingAddress implements Runnable {
+       static class FindConnectingAddress implements Runnable {
 
                private final Configuration config;
                private final FiniteDuration timeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 7dd7067..07cec32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
@@ -66,8 +67,8 @@ public class ZooKeeperTestUtils {
                config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
                // ZooKeeper recovery mode
-               config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zooKeeperQuorum);
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperQuorum);
 
                int connTimeout = 5000;
                if (System.getenv().containsKey("CI")) {
@@ -75,20 +76,20 @@ public class ZooKeeperTestUtils {
                        connTimeout = 30000;
                }
 
-               
config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
-               config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
+               
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, 
connTimeout);
+               
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 
connTimeout);
 
                // File system state backend
                config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
fsStateHandlePath + "/checkpoints");
-               config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
fsStateHandlePath + "/recovery");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
fsStateHandlePath + "/recovery");
 
                // Akka failure detection and execution retries
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");
                config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 
s");
                config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
                config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
-               config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
+               config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");
 
                return config;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index daed4a4..d5895ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -71,7 +72,7 @@ public class ZooKeeperUtilTest extends TestLogger {
        }
 
        private Configuration setQuorum(Configuration conf, String quorum) {
-               conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
+               conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
quorum);
                return conf;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index bd58515..66c4fac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -22,9 +22,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingCluster;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+
 import org.apache.zookeeper.KeeperException;
 
 import java.util.List;
@@ -58,7 +60,7 @@ public class ZooKeeperTestEnvironment {
                                zooKeeperServer = new TestingServer(true);
                                zooKeeperCluster = null;
 
-                               
conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+                               
conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
                                                
zooKeeperServer.getConnectString());
                        }
                        else {
@@ -67,7 +69,7 @@ public class ZooKeeperTestEnvironment {
 
                                zooKeeperCluster.start();
 
-                               
conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+                               
conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
                                                
zooKeeperCluster.getConnectString());
                        }
 
@@ -127,7 +129,7 @@ public class ZooKeeperTestEnvironment {
         */
        public CuratorFramework createClient() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
getConnectString());
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
getConnectString());
                return ZooKeeperUtils.startCuratorFramework(config);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index e878097..97016e4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,12 +28,12 @@ import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
 
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{HighAvailabilityOptions, 
ConfigConstants, Configuration}
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, 
MemoryArchivist, JobManager}
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, 
FlinkActor}
@@ -412,8 +412,7 @@ object TestingUtils {
     * @param configuration Configuration to use
     * @param jobManagerClass JobManager class to instantiate
     * @param prefix The prefix to use for the Actor names
-    *
-    * @return
+   * @return
     */
   def createJobManager(
       actorSystem: ActorSystem,
@@ -422,7 +421,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HA_MODE,
+    configuration.setString(
+      HighAvailabilityOptions.HA_MODE,
       ConfigConstants.DEFAULT_HA_MODE)
 
       val (actor, _) = JobManager.startJobManagerActors(
@@ -502,7 +502,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HA_MODE,
+    configuration.setString(
+      HighAvailabilityOptions.HA_MODE,
       ConfigConstants.DEFAULT_HA_MODE)
 
     val actor = FlinkResourceManager.startResourceManagerActors(

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 051175a..c005814 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SecureTestEnvironment;
@@ -215,10 +216,10 @@ public class RollingSinkSecuredITCase extends 
RollingSinkITCase {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
                        
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
                        config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
                        
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + 
"/flink/checkpoints");
-                       
config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + 
"/flink/recovery");
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
                        config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
 
                        
SecureTestEnvironment.populateFlinkSecureConfigurations(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index b5e622b..0250c16 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -115,7 +116,7 @@ public class SecureTestEnvironment {
                        Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration();
                        
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
                        
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
-                       
flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false);
+                       
flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
                        ctx.setFlinkConfiguration(flinkConfig);
                        TestingSecurityContext.install(ctx, 
getClientSecurityConfigurationMap());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index b774f97..aa5e7d3 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
@@ -121,7 +122,7 @@ public class TestBaseUtils extends TestLogger {
 
                if (startZooKeeper) {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(ConfigConstants.HA_MODE, "zookeeper");
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
                }
 
                return startCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index cc8ab80..4d10bf1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -564,7 +565,7 @@ public class ChaosMonkeyITCase extends TestLogger {
                        fail(fsCheckpoints + " does not exist: " + 
Arrays.toString(FileStateBackendBasePath.listFiles()));
                }
 
-               File fsRecovery = new File(new 
URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath());
+               File fsRecovery = new File(new 
URI(config.getString(HighAvailabilityOptions.HA_STORAGE_PATH)).getPath());
 
                LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 9b0d9de..a51f88b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -149,8 +150,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
         */
        public void testJobManagerFailure(String zkQuorum, final File 
coordinateDir) throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
-               config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, 
zkQuorum);
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
 
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                "leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 48ad7f5..4bcde16 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.commons.cli.CommandLine;
+
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
@@ -27,7 +28,7 @@ import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -38,20 +39,20 @@ import 
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -202,7 +203,7 @@ public class CliFrontendYarnAddressConfigurationTest {
                                CliFrontendParser.parseRunCommand(new String[] 
{"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
                frontend.retrieveClient(options);
-               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
 "error");
+               String zkNs = 
frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
                Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
        }
 
@@ -216,7 +217,7 @@ public class CliFrontendYarnAddressConfigurationTest {
                                CliFrontendParser.parseRunCommand(new String[] 
{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
 
                frontend.retrieveClient(options);
-               String zkNs = 
frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
 "error");
+               String zkNs = 
frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
                Assert.assertEquals(overrideZkNamespace, zkNs);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9d6ff85..79f790f 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -119,7 +120,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                        zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
                        "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
                        "@@" + 
FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + 
fsStateHandlePath + "/checkpoints" +
-                       "@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" 
+ fsStateHandlePath + "/recovery");
+                       "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
"=" + fsStateHandlePath + "/recovery");
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
 
                ClusterClient yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 848013c..9481c24 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -539,11 +540,11 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // no user specified cli argument for namespace?
                if (zkNamespace == null || zkNamespace.isEmpty()) {
                        // namespace defined in config? else use applicationId 
as default.
-                       zkNamespace = 
flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
String.valueOf(appId));
+                       zkNamespace = 
flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, 
String.valueOf(appId));
                        setZookeeperNamespace(zkNamespace);
                }
 
-               
flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
+               
flinkConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
 
                if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
                        // activate re-execution of failed applications

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index b27876b..10e229e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner {
                // override zookeeper namespace with user cli argument (if 
provided)
                String cliZKNamespace = 
ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
                if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-                       
configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, 
cliZKNamespace);
+                       
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
                }
 
                // if a web monitor shall be started, set the port to random 
binding

http://git-wip-us.apache.org/repos/asf/flink/blob/c8dc074a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d09340c..e4da140 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
@@ -60,7 +61,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static 
org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -513,8 +513,8 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                if(null != applicationID) {
                        String zkNamespace = 
cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
                                        
cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-                                       : 
config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
-                       config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, 
zkNamespace);
+                                       : 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
+                       config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
zkNamespace);
 
                        AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
                        yarnDescriptor.setFlinkConfiguration(config);

Reply via email to