[FLINK-4768] [core] Migrate high-availability configuration parameters to ConfigOptions
This closes #2607 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abc1657b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abc1657b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abc1657b Branch: refs/heads/master Commit: abc1657bac83c151a1a345220942b02fcde4653a Parents: cdebb0e Author: Stephan Ewen <[email protected]> Authored: Sat Oct 8 01:41:02 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Oct 10 12:37:06 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/cli/DefaultCLI.java | 5 +- .../configuration/HighAvailabilityOptions.java | 139 +++++++++++++++++++ .../webmonitor/WebRuntimeMonitorITCase.java | 7 +- .../flink/runtime/blob/FileSystemBlobStore.java | 22 ++- .../jobmanager/HighAvailabilityMode.java | 8 +- .../flink/runtime/security/SecurityContext.java | 11 +- .../flink/runtime/util/ZooKeeperUtils.java | 68 +++------ .../zookeeper/FlinkZooKeeperQuorumPeer.java | 46 +++--- .../flink/runtime/jobmanager/JobManager.scala | 14 +- .../flink/runtime/blob/BlobRecoveryITCase.java | 5 +- .../BlobLibraryCacheRecoveryITCase.java | 5 +- .../jobmanager/HighAvailabilityModeTest.java | 13 +- .../jobmanager/JobManagerHARecoveryTest.java | 5 +- .../ZooKeeperLeaderElectionTest.java | 25 ++-- .../ZooKeeperLeaderRetrievalTest.java | 15 +- .../runtime/testutils/ZooKeeperTestUtils.java | 13 +- .../flink/runtime/util/ZooKeeperUtilTest.java | 3 +- .../zookeeper/ZooKeeperTestEnvironment.java | 10 +- .../runtime/testingUtils/TestingUtils.scala | 13 +- .../connectors/fs/RollingSinkSecuredITCase.java | 5 +- .../flink/test/util/SecureTestEnvironment.java | 3 +- .../apache/flink/test/util/TestBaseUtils.java | 3 +- .../flink/test/recovery/ChaosMonkeyITCase.java | 3 +- ...agerHAProcessFailureBatchRecoveryITCase.java | 5 +- ...CliFrontendYarnAddressConfigurationTest.java | 11 +- .../flink/yarn/YARNHighAvailabilityITCase.java | 3 +- .../yarn/AbstractYarnClusterDescriptor.java | 5 +- .../flink/yarn/YarnApplicationMasterRunner.java | 3 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 6 +- 29 files changed, 302 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 18fa323..8f79403 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -19,11 +19,12 @@ package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; + import org.apache.flink.client.ClientUtils; import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import java.net.InetSocketAddress; @@ -64,7 +65,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> { if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); - config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID.key(), zkNamespace); } StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java new file mode 100644 index 0000000..1ee988a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to high-availability settings. + */ +@PublicEvolving +public class HighAvailabilityOptions { + + // ------------------------------------------------------------------------ + // Required High Availability Options + // ------------------------------------------------------------------------ + + /** + * Defines high-availability mode used for the cluster execution. + * A value of "NONE" signals no highly available setup. + * To enable high-availability, set this mode to "ZOOKEEPER". + */ + public static final ConfigOption<String> HA_MODE = + key("high-availability") + .defaultValue("NONE") + .withDeprecatedKeys("recovery.mode"); + + /** + * The ID of the Flink cluster, used to separate multiple Flink clusters + * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos. + */ + public static final ConfigOption<String> HA_CLUSTER_ID = + key("high-availability.cluster-id") + .defaultValue("/default") + .withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace"); + + /** + * File system path (URI) where Flink persists metadata in high-availability setups + */ + public static final ConfigOption<String> HA_STORAGE_PATH = + key("high-availability.storageDir") + .noDefaultValue() + .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir"); + + /** + * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper. + */ + public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM = + key("high-availability.zookeeper.quorum") + .noDefaultValue() + .withDeprecatedKeys("recovery.zookeeper.quorum"); + + + // ------------------------------------------------------------------------ + // Recovery Options + // ------------------------------------------------------------------------ + + /** + * Optional port (range) used by the job manager in high-availability mode. + */ + public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE = + key("high-availability.jobmanager.port") + .defaultValue("0") + .withDeprecatedKeys("recovery.jobmanager.port"); + + /** + * The time before a JobManager after a fail over recovers the current jobs. + */ + public static final ConfigOption<String> HA_JOB_DELAY = + key("high-availability.job.delay") + .noDefaultValue() + .withDeprecatedKeys("recovery.job.delay"); + + // ------------------------------------------------------------------------ + // ZooKeeper Options + // ------------------------------------------------------------------------ + + /** + * The root path under which Flink stores its entries in ZooKeeper + */ + public static final ConfigOption<String> HA_ZOOKEEPER_ROOT = + key("high-availability.zookeeper.path.root") + .defaultValue("/flink") + .withDeprecatedKeys("recovery.zookeeper.path.root"); + + // ------------------------------------------------------------------------ + // ZooKeeper Client Settings + // ------------------------------------------------------------------------ + + public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT = + key("high-availability.zookeeper.client.session-timeout") + .defaultValue(60000) + .withDeprecatedKeys("recovery.zookeeper.client.session-timeout"); + + public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT = + key("high-availability.zookeeper.client.connection-timeout") + .defaultValue(15000) + .withDeprecatedKeys("recovery.zookeeper.client.connection-timeout"); + + public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT = + key("high-availability.zookeeper.client.retry-wait") + .defaultValue(5000) + .withDeprecatedKeys("recovery.zookeeper.client.retry-wait"); + + public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS = + key("high-availability.zookeeper.client.max-retry-attempts") + .defaultValue(3) + .withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts"); + + public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE = + key("zookeeper.sasl.disable") + .defaultValue(true); + + public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME = + key("zookeeper.sasl.service-name") + .noDefaultValue(); + + // ------------------------------------------------------------------------ + + /** Not intended to be instantiated */ + private HighAvailabilityOptions() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index 54c5e76..1ae776c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; @@ -237,7 +238,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { followingClient.sendGetRequest("index.html", deadline.timeLeft()); response = followingClient.getNextResponse(deadline.timeLeft()); assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT, response.getStatus()); - assertTrue(response.getLocation().contains("" + leadingWebMonitor.getServerPort())); + assertTrue(response.getLocation().contains(String.valueOf(leadingWebMonitor.getServerPort()))); // Kill the leader leadingSystem.shutdown(); @@ -296,8 +297,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { final Configuration config = new Configuration(); config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); - config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString()); actorSystem = AkkaUtils.createDefaultActorSystem(); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index ee189d4..deba738 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -19,14 +19,17 @@ package org.apache.flink.runtime.blob; import com.google.common.io.Files; + +import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.util.ConfigurationUtil; import org.apache.flink.util.IOUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,16 +55,11 @@ class FileSystemBlobStore implements BlobStore { private final String basePath; FileSystemBlobStore(Configuration config) throws IOException { - String storagePath = ConfigurationUtil.getStringWithDeprecatedKeys( - config, - ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, - null, - ConfigConstants.ZOOKEEPER_RECOVERY_PATH); - - if (storagePath == null) { - throw new IllegalConfigurationException(String.format("Missing configuration for " + - "ZooKeeper file system path. Please specify via " + - "'%s' key.", ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH)); + String storagePath = config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); + + if (storagePath == null || StringUtils.isBlank(storagePath)) { + throw new IllegalConfigurationException("Missing high-availability storage path for metadata." + + " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'."); } this.basePath = storagePath + "/blob"; http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java index 087ad3b..fa2db48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.ConfigurationUtil; +import org.apache.flink.configuration.HighAvailabilityOptions; /** * High availability mode for Flink's cluster execution. Currently supported modes are: @@ -43,11 +43,7 @@ public enum HighAvailabilityMode { * configured. */ public static HighAvailabilityMode fromConfig(Configuration config) { - String haMode = ConfigurationUtil.getStringWithDeprecatedKeys( - config, - ConfigConstants.HA_MODE, - null, - ConfigConstants.RECOVERY_MODE); + String haMode = config.getValue(HighAvailabilityOptions.HA_MODE); if (haMode == null) { return HighAvailabilityMode.NONE; http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java index be6611f..67dd78c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.util.Preconditions; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -182,9 +183,9 @@ public class SecurityContext { //with pseudo JAAS configuration file if SASL auth is enabled for ZK System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, ""); - boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, - ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE); - if(disableSaslClient) { + boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE); + + if (disableSaslClient) { LOG.info("SASL client auth for ZK will be disabled"); //SASL auth is disabled by default but will be enabled if specified in configuration System.setProperty(ZOOKEEPER_SASL_CLIENT,"false"); @@ -212,8 +213,8 @@ public class SecurityContext { System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); System.setProperty(ZOOKEEPER_SASL_CLIENT, "true"); - String zkSaslServiceName = configuration.getString(ConfigConstants.ZOOKEEPER_SASL_SERVICE_NAME, null); - if(!StringUtils.isBlank(zkSaslServiceName)) { + String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME); + if (!StringUtils.isBlank(zkSaslServiceName)) { LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName); System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName); } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 91db564..e125e10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.util; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -57,53 +59,25 @@ public class ZooKeeperUtils { * @return {@link CuratorFramework} instance */ public static CuratorFramework startCuratorFramework(Configuration configuration) { - String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, - null, - ConfigConstants.ZOOKEEPER_QUORUM_KEY); + String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM); - if (zkQuorum == null || zkQuorum.equals("")) { + if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) { throw new RuntimeException("No valid ZooKeeper quorum has been specified. " + "You can specify the quorum via the configuration key '" + - ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'."); + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "'."); } - int sessionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, - ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT, - ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT); + int sessionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT); - int connectionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, - ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT, - ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT); + int connectionTimeout = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT); - int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT, - ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT, - ConfigConstants.ZOOKEEPER_RETRY_WAIT); + int retryWait = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_RETRY_WAIT); - int maxRetryAttempts = ConfigurationUtil.getIntegerWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS, - ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS, - ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS); + int maxRetryAttempts = configuration.getInteger(HighAvailabilityOptions.ZOOKEEPER_MAX_RETRY_ATTEMPTS); - String root = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_DIR_KEY, - ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY, - ConfigConstants.ZOOKEEPER_DIR_KEY); + String root = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT); - String namespace = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, - ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY, - ConfigConstants.ZOOKEEPER_NAMESPACE_KEY); + String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); String rootWithNamespace = generateZookeeperPath(root, namespace); @@ -138,13 +112,9 @@ public class ZooKeeperUtils { public static String getZooKeeperEnsemble(Configuration flinkConf) throws IllegalConfigurationException { - String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys( - flinkConf, - ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, - "", - ConfigConstants.ZOOKEEPER_QUORUM_KEY); + String zkQuorum = flinkConf.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM); - if (zkQuorum == null || zkQuorum.equals("")) { + if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) { throw new IllegalConfigurationException("No ZooKeeper quorum specified in config."); } @@ -317,15 +287,11 @@ public class ZooKeeperUtils { Configuration configuration, String prefix) throws IOException { - String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, - "", - ConfigConstants.ZOOKEEPER_RECOVERY_PATH); + String rootPath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH); - if (rootPath.equals("")) { - throw new IllegalConfigurationException("Missing recovery path. Specify via " + - "configuration key '" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'."); + if (rootPath == null || StringUtils.isBlank(rootPath)) { + throw new IllegalConfigurationException("Missing high-availability storage path for metadata." + + " Specify via configuration key '" + HighAvailabilityOptions.HA_STORAGE_PATH + "'."); } else { return new FileSystemStateStorageHelper<T>(rootPath, prefix); } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java index 9fba529..c4140c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.zookeeper; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.util.EnvironmentInformation; + import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; @@ -47,8 +47,25 @@ import java.util.UUID; */ public class FlinkZooKeeperQuorumPeer { + /** ZooKeeper default client port. */ + public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181; + + /** ZooKeeper default init limit. */ + public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10; + + /** ZooKeeper default sync limit. */ + public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5; + + /** ZooKeeper default peer port. */ + public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888; + + /** ZooKeeper default leader port. */ + public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888; + private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class); + // ------------------------------------------------------------------------ + public static void main(String[] args) { try { // startup checks and logging @@ -67,6 +84,8 @@ public class FlinkZooKeeperQuorumPeer { } } + // ------------------------------------------------------------------------ + /** * Runs a ZooKeeper {@link QuorumPeer} if further peers are configured or a single * {@link ZooKeeperServer} if no further peers are configured. @@ -120,26 +139,23 @@ public class FlinkZooKeeperQuorumPeer { private static void setRequiredProperties(Properties zkProps) { // Set default client port if (zkProps.getProperty("clientPort") == null) { - int clientPort = ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; - zkProps.setProperty("clientPort", String.valueOf(clientPort)); + zkProps.setProperty("clientPort", String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_PORT)); - LOG.warn("No 'clientPort' configured. Set to '{}'.", clientPort); + LOG.warn("No 'clientPort' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_CLIENT_PORT); } // Set default init limit if (zkProps.getProperty("initLimit") == null) { - int initLimit = ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT; - zkProps.setProperty("initLimit", String.valueOf(initLimit)); + zkProps.setProperty("initLimit", String.valueOf(DEFAULT_ZOOKEEPER_INIT_LIMIT)); - LOG.warn("No 'initLimit' configured. Set to '{}'.", initLimit); + LOG.warn("No 'initLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_INIT_LIMIT); } // Set default sync limit if (zkProps.getProperty("syncLimit") == null) { - int syncLimit = ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT; - zkProps.setProperty("syncLimit", String.valueOf(syncLimit)); + zkProps.setProperty("syncLimit", String.valueOf(DEFAULT_ZOOKEEPER_SYNC_LIMIT)); - LOG.warn("No 'syncLimit' configured. Set to '{}'.", syncLimit); + LOG.warn("No 'syncLimit' configured. Set to '{}'.", DEFAULT_ZOOKEEPER_SYNC_LIMIT); } // Set default data dir @@ -152,8 +168,8 @@ public class FlinkZooKeeperQuorumPeer { LOG.warn("No 'dataDir' configured. Set to '{}'.", dataDir); } - int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT; - int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT; + int peerPort = DEFAULT_ZOOKEEPER_PEER_PORT; + int leaderPort = DEFAULT_ZOOKEEPER_LEADER_PORT; // Set peer and leader ports if none given, because ZooKeeper complains if multiple // servers are configured, but no ports are given. @@ -220,12 +236,8 @@ public class FlinkZooKeeperQuorumPeer { // Write myid to file. We use a File Writer, because that properly propagates errors, // while the PrintWriter swallows errors - FileWriter writer = new FileWriter(new File(dataDir, "myid")); - try { + try (FileWriter writer = new FileWriter(new File(dataDir, "myid"))) { writer.write(String.valueOf(id)); } - finally { - writer.close(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index e90f2d2..be820ae 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -31,7 +31,7 @@ import akka.pattern.ask import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID import org.apache.flink.api.common.time.Time -import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.metrics.{Gauge, MetricGroup} @@ -2367,9 +2367,7 @@ object JobManager { configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) // The port range of allowed job manager ports or 0 for random - configuration.getString( - ConfigConstants.RECOVERY_JOB_MANAGER_PORT, - ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT) + configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE) } else { LOG.info("Starting JobManager without high-availability") @@ -2501,11 +2499,7 @@ object JobManager { val savepointStore = SavepointStoreFactory.createFromConfig(configuration) - val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys( - configuration, - ConfigConstants.HA_JOB_DELAY, - null, - ConfigConstants.RECOVERY_JOB_DELAY) + val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY) val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) { timeout @@ -2515,7 +2509,7 @@ object JobManager { } catch { case n: NumberFormatException => throw new Exception( - s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " + + s"Invalid config value for ${HighAvailabilityOptions.HA_JOB_DELAY.key()}: " + s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')") } } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 8464d68..8ba20c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.junit.After; import org.junit.Before; @@ -68,9 +69,9 @@ public class BlobRecoveryITCase { try { Configuration config = new Configuration(); - config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, recoveryDir.getPath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index f6bed56..f6cdf09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; @@ -63,9 +64,9 @@ public class BlobLibraryCacheRecoveryITCase { try { Configuration config = new Configuration(); - config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java index 04c0e48..91fb514 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java @@ -20,7 +20,8 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.configuration.HighAvailabilityOptions; + import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -42,7 +43,7 @@ public class HighAvailabilityModeTest { assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config)); // Check not equals default - config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); + config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config)); } @@ -54,16 +55,16 @@ public class HighAvailabilityModeTest { Configuration config = new Configuration(); // Check mapping of old default to new default - config.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE); + config.setString("recovery.mode", ConfigConstants.DEFAULT_RECOVERY_MODE); assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config)); // Check deprecated config - config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); + config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config)); // Check precedence over deprecated config - config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.NONE.name().toLowerCase()); - config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); + config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase()); + config.setString("recovery.mode", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config)); } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index b9c2bdf..612fe35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; @@ -133,8 +134,8 @@ public class JobManagerHARecoveryTest { ActorRef jobManager = null; ActorRef taskManager = null; - flinkConfiguration.setString(ConfigConstants.HA_MODE, "zookeeper"); - flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.newFolder().toString()); + flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); try { http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index e20985b..1f1eb62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; @@ -89,8 +90,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperLeaderElectionRetrieval() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -134,8 +135,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperReelection() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); @@ -217,8 +218,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperReelectionWithReplacement() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); int num = 3; int numTries = 30; @@ -295,8 +296,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { final String leaderPath = "/leader"; Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath); ZooKeeperLeaderElectionService leaderElectionService = null; @@ -379,8 +380,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testExceptionForwarding() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -448,8 +449,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testEphemeralZooKeeperNodes() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0fe0644..70b1da0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -20,16 +20,19 @@ package org.apache.flink.runtime.leaderelection; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; -import org.apache.flink.configuration.ConfigConstants; + import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Before; import org.junit.Test; + import scala.Option; import scala.concurrent.duration.FiniteDuration; @@ -82,8 +85,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ long sleepingTime = 1000; - config.setString(ConfigConstants.HA_MODE, "zookeeper"); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); LeaderElectionService leaderElectionService = null; LeaderElectionService faultyLeaderElectionService; @@ -179,8 +182,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ @Test public void testTimeoutOfFindConnectingAddress() throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.HA_MODE, "zookeeper"); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); @@ -190,7 +193,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ assertEquals(InetAddress.getLocalHost(), result); } - class FindConnectingAddress implements Runnable { + static class FindConnectingAddress implements Runnable { private final Configuration config; private final FiniteDuration timeout; http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 7dd7067..07cec32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; @@ -66,8 +67,8 @@ public class ZooKeeperTestUtils { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1); // ZooKeeper recovery mode - config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum); int connTimeout = 5000; if (System.getenv().containsKey("CI")) { @@ -75,20 +76,20 @@ public class ZooKeeperTestUtils { connTimeout = 30000; } - config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); - config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout); + config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); + config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout); // File system state backend config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints"); - config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, fsStateHandlePath + "/recovery"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery"); // Akka failure detection and execution retries config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); - config.setString(ConfigConstants.HA_JOB_DELAY, "10 s"); + config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s"); return config; } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java index daed4a4..d5895ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -71,7 +72,7 @@ public class ZooKeeperUtilTest extends TestLogger { } private Configuration setQuorum(Configuration conf, String quorum) { - conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum); + conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, quorum); return conf; } } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java index bd58515..66c4fac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -22,9 +22,11 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.ZKPaths; -import org.apache.flink.configuration.ConfigConstants; + import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.util.ZooKeeperUtils; + import org.apache.zookeeper.KeeperException; import java.util.List; @@ -58,7 +60,7 @@ public class ZooKeeperTestEnvironment { zooKeeperServer = new TestingServer(true); zooKeeperCluster = null; - conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, + conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperServer.getConnectString()); } else { @@ -67,7 +69,7 @@ public class ZooKeeperTestEnvironment { zooKeeperCluster.start(); - conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, + conf.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperCluster.getConnectString()); } @@ -127,7 +129,7 @@ public class ZooKeeperTestEnvironment { */ public CuratorFramework createClient() { Configuration config = new Configuration(); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, getConnectString()); return ZooKeeperUtils.startCuratorFramework(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 5628f3c..a268c83 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -28,12 +28,12 @@ import com.typesafe.config.ConfigFactory import grizzled.slf4j.Logger import org.apache.flink.api.common.JobExecutionResult -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{HighAvailabilityOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager} +import org.apache.flink.runtime.jobmanager.{HighAvailabilityMode, MemoryArchivist, JobManager} import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor} @@ -413,8 +413,7 @@ object TestingUtils { * @param configuration Configuration to use * @param jobManagerClass JobManager class to instantiate * @param prefix The prefix to use for the Actor names - * - * @return + * @return */ def createJobManager( actorSystem: ActorSystem, @@ -423,7 +422,8 @@ object TestingUtils { prefix: String) : ActorGateway = { - configuration.setString(ConfigConstants.HA_MODE, + configuration.setString( + HighAvailabilityOptions.HA_MODE, ConfigConstants.DEFAULT_HA_MODE) val (actor, _) = JobManager.startJobManagerActors( @@ -503,7 +503,8 @@ object TestingUtils { configuration: Configuration) : ActorGateway = { - configuration.setString(ConfigConstants.HA_MODE, + configuration.setString( + HighAvailabilityOptions.HA_MODE, ConfigConstants.DEFAULT_HA_MODE) val actor = FlinkResourceManager.startResourceManagerActors( http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index 051175a..c005814 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.SecureTestEnvironment; @@ -215,10 +216,10 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints"); - config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, hdfsURI + "/flink/recovery"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery"); config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints"); SecureTestEnvironment.populateFlinkSecureConfigurations(config); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java index b5e622b..0250c16 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.test.util; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.security.SecurityContext; import org.apache.hadoop.minikdc.MiniKdc; import org.junit.rules.TemporaryFolder; @@ -115,7 +116,7 @@ public class SecureTestEnvironment { Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab); flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal); - flinkConfig.setBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE, false); + flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false); ctx.setFlinkConfiguration(flinkConfig); TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index b774f97..aa5e7d3 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.io.IOUtils; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; @@ -121,7 +122,7 @@ public class TestBaseUtils extends TestLogger { if (startZooKeeper) { config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(ConfigConstants.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); } return startCluster(config, singleActorSystem); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index cc8ab80..4d10bf1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -564,7 +565,7 @@ public class ChaosMonkeyITCase extends TestLogger { fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())); } - File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath()); + File fsRecovery = new File(new URI(config.getString(HighAvailabilityOptions.HA_STORAGE_PATH)).getPath()); LOG.info("Checking " + fsRecovery); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 9b0d9de..a51f88b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; @@ -149,8 +150,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 48ad7f5..4bcde16 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -19,6 +19,7 @@ package org.apache.flink.yarn; import org.apache.commons.cli.CommandLine; + import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; @@ -27,7 +28,7 @@ import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; @@ -38,20 +39,20 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; + import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; + import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.StandardOpenOption; @@ -202,7 +203,7 @@ public class CliFrontendYarnAddressConfigurationTest { CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}); frontend.retrieveClient(options); - String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error"); + String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID); Assert.assertTrue(zkNs.matches("application_\\d+_0042")); } @@ -216,7 +217,7 @@ public class CliFrontendYarnAddressConfigurationTest { CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}); frontend.retrieveClient(options); - String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error"); + String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID); Assert.assertEquals(overrideZkNamespace, zkNs); } http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 9d6ff85..79f790f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -119,7 +120,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + - "@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery"); + "@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); ClusterClient yarnCluster = null; http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 848013c..9481c24 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -22,6 +22,7 @@ import org.apache.flink.client.CliFrontend; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.security.SecurityContext; @@ -539,11 +540,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // no user specified cli argument for namespace? if (zkNamespace == null || zkNamespace.isEmpty()) { // namespace defined in config? else use applicationId as default. - zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId)); + zkNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId)); setZookeeperNamespace(zkNamespace); } - flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + flinkConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) { // activate re-execution of failed applications http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index b27876b..10e229e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -25,6 +25,7 @@ import akka.actor.Props; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -472,7 +473,7 @@ public class YarnApplicationMasterRunner { // override zookeeper namespace with user cli argument (if provided) String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE); if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) { - configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace); + configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace); } // if a web monitor shall be started, set the port to random binding http://git-wip-us.apache.org/repos/asf/flink/blob/abc1657b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index d09340c..e4da140 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -29,6 +29,7 @@ import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; @@ -60,7 +61,6 @@ import java.util.Map; import java.util.Properties; import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; -import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY; /** * Class handling the command line interface to the YARN session. @@ -513,8 +513,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> if(null != applicationID) { String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ? cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()) - : config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID); - config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + : config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); yarnDescriptor.setFlinkConfiguration(config);
