http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 1c0290a..048fbee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -89,8 +89,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperLeaderElectionRetrieval() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -134,8 +134,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperReelection() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); @@ -217,8 +217,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testZooKeeperReelectionWithReplacement() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); int num = 3; int numTries = 30; @@ -295,9 +295,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { final String leaderPath = "/leader"; Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); - configuration.setString(ConfigConstants.ZOOKEEPER_LEADER_PATH, leaderPath); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -379,8 +379,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testExceptionForwarding() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService = null; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -448,8 +448,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { @Test public void testEphemeralZooKeeperNodes() throws Exception { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); ZooKeeperLeaderElectionService leaderElectionService; ZooKeeperLeaderRetrievalService leaderRetrievalService = null; @@ -466,7 +466,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { listener = new TestingListener(); client = ZooKeeperUtils.startCuratorFramework(configuration); - final String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH, + final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH); cache = new NodeCache(client, leaderPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index aae1840..5aace34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -43,6 +44,7 @@ import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ZooKeeperLeaderRetrievalTest extends TestLogger{ @@ -82,8 +84,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ long sleepingTime = 1000; - config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); - config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); LeaderElectionService leaderElectionService = null; LeaderElectionService faultyLeaderElectionService; @@ -179,8 +181,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ @Test public void testTimeoutOfFindConnectingAddress() throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); - config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); @@ -190,6 +192,46 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ assertEquals(InetAddress.getLocalHost(), result); } + @Test + public void testConnectionToZookeeperOverridingOldConfig() throws Exception { + Configuration config = new Configuration(); + // The new config will be taken into effect + config.setString(ConfigConstants.RECOVERY_MODE, "standalone"); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + + FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + LeaderRetrievalService leaderRetrievalService = + LeaderRetrievalUtils.createLeaderRetrievalService(config); + InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout); + + assertEquals(InetAddress.getLocalHost(), result); + } + + @Test + public void testConnectionToStandAloneLeaderOverridingOldConfig() throws Exception { + Configuration config = new Configuration(); + // The new config will be taken into effect + config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "none"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + + HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config); + assertTrue(mode == HighAvailabilityMode.NONE); + } + + @Test + public void testConnectionToZookeeperUsingOldConfig() throws Exception { + Configuration config = new Configuration(); + // The new config will be taken into effect + config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); + + HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config); + assertTrue(mode == HighAvailabilityMode.ZOOKEEPER); + } + class FindConnectingAddress implements Runnable { private final Configuration config; http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java index fac6162..66d523f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java @@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess { * <code>--port PORT</code>. * * <p>Other arguments are parsed to a {@link Configuration} and passed to the - * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum + * JobManager, for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum * "xyz:123:456"</code>. */ public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java index 97e7cca..417dc88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java @@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess { /** * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager, - * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>. + * for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>. */ public static void main(String[] args) throws Exception { try { http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 2796337..c94842f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -31,12 +31,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class ZooKeeperTestUtils { /** - * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + * Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}. * * @param zooKeeperQuorum ZooKeeper quorum to connect to * @param fsStateHandlePath Base path for file system state backend (for checkpoints and * recovery) - * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}. */ public static Configuration createZooKeeperRecoveryModeConfig( String zooKeeperQuorum, String fsStateHandlePath) { @@ -45,13 +45,13 @@ public class ZooKeeperTestUtils { } /** - * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}. + * Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}. * * @param config Configuration to use * @param zooKeeperQuorum ZooKeeper quorum to connect to * @param fsStateHandlePath Base path for file system state backend (for checkpoints and * recovery) - * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}. + * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}. */ public static Configuration setZooKeeperRecoveryMode( Configuration config, @@ -66,8 +66,8 @@ public class ZooKeeperTestUtils { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1); // ZooKeeper recovery mode - config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum); int connTimeout = 5000; if (System.getenv().containsKey("CI")) { @@ -75,20 +75,20 @@ public class ZooKeeperTestUtils { connTimeout = 30000; } - config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); - config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout); + config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout); + config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout); // File system state backend config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints"); - config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery"); + config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, fsStateHandlePath + "/recovery"); // Akka failure detection and execution retries config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); - config.setString(ConfigConstants.RECOVERY_JOB_DELAY, "10 s"); + config.setString(ConfigConstants.HA_JOB_DELAY, "10 s"); return config; } http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java index 0d01f65..daed4a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java @@ -71,7 +71,7 @@ public class ZooKeeperUtilTest extends TestLogger { } private Configuration setQuorum(Configuration conf, String quorum) { - conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum); + conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum); return conf; } } http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java index 8fc80e0..467706f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -58,7 +58,7 @@ public class ZooKeeperTestEnvironment { zooKeeperServer = new TestingServer(true); zooKeeperCluster = null; - conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, + conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperServer.getConnectString()); } else { @@ -67,7 +67,7 @@ public class ZooKeeperTestEnvironment { zooKeeperCluster.start(); - conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, + conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperCluster.getConnectString()); } @@ -127,7 +127,7 @@ public class ZooKeeperTestEnvironment { */ public CuratorFramework createClient() { Configuration config = new Configuration(); - config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString()); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString()); return ZooKeeperUtils.startCuratorFramework(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 02a0fec..7d2b86c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -423,7 +423,8 @@ object TestingUtils { prefix: String) : ActorGateway = { - configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE) + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, + ConfigConstants.DEFAULT_HIGH_AVAILABILTY) val (actor, _) = JobManager.startJobManagerActors( configuration, @@ -502,7 +503,8 @@ object TestingUtils { configuration: Configuration) : ActorGateway = { - configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE) + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, + ConfigConstants.DEFAULT_HIGH_AVAILABILTY) val actor = FlinkResourceManager.startResourceManagerActors( configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index c51e666..7e5acee 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger { if (startZooKeeper) { config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3); - config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); } return startCluster(config, singleActorSystem); http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 42c0a6a..5dd4188 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -29,7 +29,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode} +import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode} import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager @@ -261,14 +261,16 @@ class ForkableFlinkMiniCluster( } override def start(): Unit = { - val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "") + val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "") - zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals("")) { + zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER && + zookeeperURL.equals("")) { LOG.info("Starting ZooKeeper cluster.") val testingCluster = new TestingCluster(1) - configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString) + configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, + testingCluster.getConnectString) testingCluster.start() http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index e97532c..22bf62a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger { fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())); } - File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "")).getPath()); + File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath()); LOG.info("Checking " + fsRecovery); http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index eccf971..e0e165d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -482,7 +482,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { // ZooKeeper String currentJobsPath = config.getString( - ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH, + ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); @@ -514,7 +514,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { // ZooKeeper String currentJobsPath = config.getString( - ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH, + ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath); http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 88aeb09..0c52204 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -149,8 +149,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum); + config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER"); + config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 45ee839..7091339 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { int numJMs = 10; int numTMs = 3; - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); + configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); @@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); + configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index 60ae2ef..48ad7f5 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -202,7 +202,7 @@ public class CliFrontendYarnAddressConfigurationTest { CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}); frontend.retrieveClient(options); - String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, "error"); + String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error"); Assert.assertTrue(zkNs.matches("application_\\d+_0042")); } @@ -216,7 +216,7 @@ public class CliFrontendYarnAddressConfigurationTest { CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}); frontend.retrieveClient(options); - String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, "error"); + String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error"); Assert.assertEquals(overrideZkNamespace, zkNs); } http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 9b52975..25dbe53 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + - "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); + "@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); ClusterClient yarnCluster = null; http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index ba07af1..f4c2032 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -23,7 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -553,13 +553,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // no user specified cli argument for namespace? if (zkNamespace == null || zkNamespace.isEmpty()) { // namespace defined in config? else use applicationId as default. - zkNamespace = flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId)); + zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId)); setZookeeperNamespace(zkNamespace); } - flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); - if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) { // activate re-execution of failed applications appContext.setMaxAppAttempts( flinkConfiguration.getInteger( http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 39b2510..87a2c98 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -430,7 +430,7 @@ public class YarnApplicationMasterRunner { // override zookeeper namespace with user cli argument (if provided) String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE); if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) { - configuration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace); } // if a web monitor shall be started, set the port to random binding http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index bee6a7a..3c93e34 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -58,7 +58,7 @@ import java.util.Map; import java.util.Properties; import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; -import static org.apache.flink.configuration.ConfigConstants.ZOOKEEPER_NAMESPACE_KEY; +import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY; /** * Class handling the command line interface to the YARN session. @@ -503,8 +503,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> if(null != applicationID) { String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ? cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt()) - : config.getString(ZOOKEEPER_NAMESPACE_KEY, applicationID); - config.setString(ZOOKEEPER_NAMESPACE_KEY, zkNamespace); + : config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID); + config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace); AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); yarnDescriptor.setFlinkConfiguration(config);