http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 5dd4188..fa3135a 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -263,7 +263,7 @@ class ForkableFlinkMiniCluster( override def start(): Unit = { val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "") - zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER && + zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER && zookeeperURL.equals("")) { LOG.info("Starting ZooKeeper cluster.")
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java index 22bf62a..cc8ab80 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -148,7 +148,7 @@ public class ChaosMonkeyITCase extends TestLogger { // ----------------------------------------------------------------------------------------- // Setup - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString()); // Akka and restart timeouts @@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger { fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())); } - File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath()); + File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath()); LOG.info("Checking " + fsRecovery); http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index f66e52c..49eaeb7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -160,7 +160,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString()); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); @@ -311,7 +311,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { final String zooKeeperQuorum = ZooKeeper.getConnectString(); final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString(); - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( zooKeeperQuorum, fileStateBackendPath); http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index e0e165d..bf39c4b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -125,7 +125,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { */ @Test public void testJobPersistencyWhenJobManagerShutdown() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); // Configure the cluster @@ -172,7 +172,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { */ @Test public void testSubmitJobToNonLeader() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); // Configure the cluster @@ -257,7 +257,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { */ @Test public void testClientNonDetachedListeningBehaviour() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); // Test actor system http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 0c52204..9b0d9de 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -149,7 +149,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { Configuration config = new Configuration(); - config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER"); + config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( @@ -249,7 +249,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { coordinateTempDir = createTempDirectory(); // Job Managers - Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); // Start first process http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 7091339..9bd8cc3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { int numJMs = 10; int numTMs = 3; - configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); @@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper"); + configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); + configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 25dbe53..a293348 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + - "@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery"); + "@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); ClusterClient yarnCluster = null;
