Repository: flink Updated Branches: refs/heads/master 5671c77c3 -> a6890b284
http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java new file mode 100644 index 0000000..a0c8312 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.leaderelection.TestingListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; +import org.apache.flink.runtime.testutils.JobManagerProcess; +import org.apache.flink.runtime.testutils.TaskManagerProcess; +import org.apache.flink.runtime.testutils.TestJvmProcess; +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.junit.AfterClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Ignore +public class ChaosMonkeyITCase { + + private static final Logger LOG = LoggerFactory.getLogger(ChaosMonkeyITCase.class); + + private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + + private final static File FileStateBackendBasePath; + + private final static File CheckpointCompletedCoordination; + + private final static File ProceedCoordination; + + private final static String COMPLETED_PREFIX = "completed_"; + + private final static long LastElement = -1; + + private final Random rand = new Random(); + + private int jobManagerPid; + private int taskManagerPid; + + static { + try { + FileStateBackendBasePath = CommonTestUtils.createTempDirectory(); + CheckpointCompletedCoordination = new File(FileStateBackendBasePath, COMPLETED_PREFIX); + ProceedCoordination = new File(FileStateBackendBasePath, "proceed"); + } + catch (IOException e) { + throw new RuntimeException("Error in test setup. Could not create directory.", e); + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (ZooKeeper != null) { + ZooKeeper.shutdown(); + } + + if (FileStateBackendBasePath != null) { + FileUtils.deleteDirectory(FileStateBackendBasePath); + } + } + + @Test + public void testChaosMonkey() throws Exception { + // Test config + final int numberOfJobManagers = 3; + final int numberOfTaskManagers = 3; + final int numberOfSlotsPerTaskManager = 2; + + // The final count each source is counting to: 1...n + final int n = 5000; + + // Parallelism for the program + final int parallelism = numberOfTaskManagers * numberOfSlotsPerTaskManager; + + // The test should not run longer than this + final FiniteDuration testDuration = new FiniteDuration(10, TimeUnit.MINUTES); + + // Every x seconds a random job or task manager is killed + // + // The job will will be running for $killEvery seconds and then a random Job/TaskManager + // will be killed. On recovery (which takes some time to bring up the new process etc.), + // this test will wait for task managers to reconnect before starting the next count down. + // Therefore the delay between retries is not important in this setup. + final FiniteDuration killEvery = new FiniteDuration(30, TimeUnit.SECONDS); + + // Trigger a checkpoint every + final int checkpointingIntervalMs = 2000; + + // Total number of kills + final int totalNumberOfKills = 5; + + // ----------------------------------------------------------------------------------------- + + // Setup + Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig( + ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); + + // Akka and restart timeouts + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); + config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s"); + config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9); + + if (checkpointingIntervalMs >= killEvery.toMillis()) { + throw new IllegalArgumentException("Relax! You want to kill processes every " + + killEvery + ", but the checkpointing interval is " + + checkpointingIntervalMs / 1000 + " seconds. Either decrease the interval or " + + "increase the kill interval. Otherwise, the program will not complete any " + + "checkpoint."); + } + + // Task manager + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numberOfSlotsPerTaskManager); + + ActorSystem testActorSystem = null; + LeaderRetrievalService leaderRetrievalService = null; + List<JobManagerProcess> jobManagerProcesses = new ArrayList<>(); + List<TaskManagerProcess> taskManagerProcesses = new ArrayList<>(); + + try { + // Initial state + for (int i = 0; i < numberOfJobManagers; i++) { + jobManagerProcesses.add(createAndStartJobManagerProcess(config)); + } + + for (int i = 0; i < numberOfTaskManagers; i++) { + taskManagerProcesses.add(createAndStartTaskManagerProcess(config)); + } + + testActorSystem = AkkaUtils.createDefaultActorSystem(); + + // Leader listener + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config); + TestingListener leaderListener = new TestingListener(); + leaderRetrievalService.start(leaderListener); + + Deadline deadline = testDuration.fromNow(); + + // Wait for the new leader + int leaderIndex = waitForNewLeader( + leaderListener, jobManagerProcesses, deadline.timeLeft()); + + // Wait for the task managers to connect + waitForTaskManagers( + numberOfTaskManagers, + jobManagerProcesses.get(leaderIndex), + testActorSystem, + deadline.timeLeft()); + + // The job + JobGraph jobGraph = createJobGraph(n, CheckpointCompletedCoordination.getPath(), + ProceedCoordination.getPath(), parallelism, checkpointingIntervalMs); + + LOG.info("Submitting job {}", jobGraph.getJobID()); + submitJobGraph(jobGraph, jobManagerProcesses.get(leaderIndex), leaderListener, + testActorSystem, deadline.timeLeft()); + + LOG.info("Waiting for a checkpoint to complete before kicking off chaos"); + + // Wait for a checkpoint to complete + TestJvmProcess.waitForMarkerFiles(FileStateBackendBasePath, COMPLETED_PREFIX, + parallelism, deadline.timeLeft().toMillis()); + + LOG.info("Checkpoint completed... ready for chaos"); + + int currentKillNumber = 1; + int currentJobManagerKills = 0; + int currentTaskManagerKills = 0; + + for (int i = 0; i < totalNumberOfKills; i++) { + LOG.info("Waiting for {} before next kill ({}/{})", killEvery, currentKillNumber++, totalNumberOfKills); + Thread.sleep(killEvery.toMillis()); + + LOG.info("Checking job status..."); + + JobStatus jobStatus = requestJobStatus(jobGraph.getJobID(), + jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft()); + + if (jobStatus != JobStatus.RUNNING && jobStatus != JobStatus.FINISHED) { + // Wait for it to run + LOG.info("Waiting for job status {}", JobStatus.RUNNING); + waitForJobRunning(jobGraph.getJobID(), jobManagerProcesses.get(leaderIndex), + testActorSystem, deadline.timeLeft()); + } + else if (jobStatus == JobStatus.FINISHED) { + // Early finish + LOG.info("Job finished"); + return; + } + else { + LOG.info("Job status is {}", jobStatus); + } + + if (rand.nextBoolean()) { + LOG.info("Killing the leading JobManager"); + + JobManagerProcess newJobManager = createAndStartJobManagerProcess(config); + + JobManagerProcess leader = jobManagerProcesses.remove(leaderIndex); + leader.destroy(); + currentJobManagerKills++; + + LOG.info("Killed {}", leader); + + // Make sure to add the new job manager before looking for a new leader + jobManagerProcesses.add(newJobManager); + + // Wait for the new leader + leaderIndex = waitForNewLeader( + leaderListener, jobManagerProcesses, deadline.timeLeft()); + + // Wait for the task managers to connect + waitForTaskManagers( + numberOfTaskManagers, + jobManagerProcesses.get(leaderIndex), + testActorSystem, + deadline.timeLeft()); + } + else { + LOG.info("Killing a random TaskManager"); + TaskManagerProcess newTaskManager = createAndStartTaskManagerProcess(config); + + // Wait for this new task manager to be connected + waitForTaskManagers( + numberOfTaskManagers + 1, + jobManagerProcesses.get(leaderIndex), + testActorSystem, + deadline.timeLeft()); + + // Now it's safe to kill a process + int next = rand.nextInt(numberOfTaskManagers); + TaskManagerProcess taskManager = taskManagerProcesses.remove(next); + + LOG.info("{} has been chosen. Killing process...", taskManager); + + taskManager.destroy(); + currentTaskManagerKills++; + + // Add the new task manager after killing an old one + taskManagerProcesses.add(newTaskManager); + } + } + + LOG.info("Chaos is over. Total kills: {} ({} job manager + {} task managers). " + + "Checking job status...", + totalNumberOfKills, currentJobManagerKills, currentTaskManagerKills); + + // Signal the job to speed up (if it is not done yet) + TestJvmProcess.touchFile(ProceedCoordination); + + // Wait for the job to finish + LOG.info("Waiting for job status {}", JobStatus.FINISHED); + waitForJobFinished(jobGraph.getJobID(), jobManagerProcesses.get(leaderIndex), + testActorSystem, deadline.timeLeft()); + + LOG.info("Job finished"); + + LOG.info("Waiting for job removal"); + waitForJobRemoved(jobGraph.getJobID(), jobManagerProcesses.get(leaderIndex), + testActorSystem, deadline.timeLeft()); + LOG.info("Job removed"); + + LOG.info("Checking clean recovery state..."); + checkCleanRecoveryState(config); + LOG.info("Recovery state clean"); + } + catch (Throwable t) { + System.out.println("#################################################"); + System.out.println(" TASK MANAGERS"); + System.out.println("#################################################"); + + for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) { + taskManagerProcess.printProcessLog(); + } + + System.out.println("#################################################"); + System.out.println(" JOB MANAGERS"); + System.out.println("#################################################"); + + for (JobManagerProcess jobManagerProcess : jobManagerProcesses) { + jobManagerProcess.printProcessLog(); + } + + t.printStackTrace(); + } + finally { + for (JobManagerProcess jobManagerProcess : jobManagerProcesses) { + if (jobManagerProcess != null) { + jobManagerProcess.destroy(); + } + } + + if (leaderRetrievalService != null) { + leaderRetrievalService.stop(); + } + + if (testActorSystem != null) { + testActorSystem.shutdown(); + } + } + } + + // - The test program -------------------------------------------------------------------------- + + private JobGraph createJobGraph( + int n, + String completedCheckpointMarker, + String proceedMarker, + int parallelism, + int checkpointingIntervalMs) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.enableCheckpointing(checkpointingIntervalMs); + + int expectedResult = parallelism * n * (n + 1) / 2; + + env.addSource(new CheckpointedSequenceSource(n, completedCheckpointMarker, proceedMarker)) + .addSink(new CountingSink(parallelism, expectedResult)) + .setParallelism(1); + + return env.getStreamGraph().getJobGraph(); + } + + public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> + implements Checkpointed<Long>, CheckpointNotifier { + + private static final long serialVersionUID = 0L; + + private final long end; + + private final String completedCheckpointMarkerFilePath; + + private final File proceedFile; + + private long current = 0; + + private volatile boolean isRunning = true; + + public CheckpointedSequenceSource(long end, String completedCheckpointMarkerFilePath, String proceedMarkerFilePath) { + checkArgument(end >= 0, "Negative final count"); + + this.end = end; + this.completedCheckpointMarkerFilePath = completedCheckpointMarkerFilePath; + this.proceedFile = new File(proceedMarkerFilePath); + } + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + while (isRunning) { + + if (!proceedFile.exists()) { + Thread.sleep(50); + } + + synchronized (ctx.getCheckpointLock()) { + if (current <= end) { + ctx.collect(current++); + } + else { + ctx.collect(LastElement); + return; + } + } + } + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + LOG.info("Snapshotting state {} @ ID {}.", current, checkpointId); + return current; + } + + @Override + public void restoreState(Long state) { + LOG.info("Restoring state {}/{}", state, end); + current = state; + } + + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.info("Checkpoint {} completed.", checkpointId); + + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + TestJvmProcess.touchFile(new File(completedCheckpointMarkerFilePath + taskIndex)); + } + } + + public static class CountingSink extends RichSinkFunction<Long> + implements Checkpointed<CountingSink>, CheckpointNotifier { + + private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class); + + private static final long serialVersionUID = 0L; + + private final int parallelism; + + private final long expectedFinalCount; + + private long current; + + private int numberOfReceivedLastElements; + + + public CountingSink(int parallelism, long expectedFinalCount) { + this.expectedFinalCount = expectedFinalCount; + this.parallelism = parallelism; + } + + @Override + public void invoke(Long value) throws Exception { + if (value == LastElement) { + numberOfReceivedLastElements++; + + if (numberOfReceivedLastElements == parallelism) { + if (current != expectedFinalCount) { + throw new Exception("Unexpected final result " + current); + } + else { + LOG.info("Final result " + current); + } + } + else if (numberOfReceivedLastElements > parallelism) { + throw new IllegalStateException("Received more elements than parallelism."); + } + } + else { + current += value; + } + } + + @Override + public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + LOG.info("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId); + return this; + } + + @Override + public void restoreState(CountingSink state) { + LOG.info("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements); + this.current = state.current; + this.numberOfReceivedLastElements = state.numberOfReceivedLastElements; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.info("Checkpoint {} completed.", checkpointId); + } + } + + // - Utilities --------------------------------------------------------------------------------- + + private void submitJobGraph( + JobGraph jobGraph, + JobManagerProcess jobManager, + TestingListener leaderListener, + ActorSystem actorSystem, + FiniteDuration timeout) throws Exception { + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + UUID jobManagerLeaderId = leaderListener.getLeaderSessionID(); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, jobManagerLeaderId); + + jobManagerGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); + } + + private void checkCleanRecoveryState(Configuration config) throws Exception { + LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + List<String> jobGraphs = ZooKeeper.getClient().getChildren() + .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH); + assertEquals("Unclean job graphs: " + jobGraphs, 0, jobGraphs.size()); + + LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); + List<String> checkpoints = ZooKeeper.getClient().getChildren() + .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH); + assertEquals("Unclean checkpoints: " + checkpoints, 0, checkpoints.size()); + + LOG.info("Checking " + ZooKeeper.getClient().getNamespace() + + ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); + List<String> checkpointCounter = ZooKeeper.getClient().getChildren() + .forPath(ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); + assertEquals("Unclean checkpoint counter: " + checkpointCounter, 0, checkpointCounter.size()); + + LOG.info("ZooKeeper state is clean"); + + LOG.info("Checking file system backend state..."); + + File fsCheckpoints = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_DIR, "")); + + LOG.info("Checking " + fsCheckpoints); + + File[] files = fsCheckpoints.listFiles(); + if (files == null) { + fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())); + } + else { + assertEquals("Unclean file system checkpoints: " + Arrays.toString(fsCheckpoints.listFiles()), + 0, files.length); + } + + File fsRecovery = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "")); + + LOG.info("Checking " + fsRecovery); + + files = fsRecovery.listFiles(); + if (files == null) { + fail(fsRecovery + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())); + } + else { + assertEquals("Unclean file system checkpoints: " + Arrays.toString(fsRecovery.listFiles()), + 0, files.length); + } + } + + private void waitForJobRemoved( + JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) + throws Exception { + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null); + + Future<Object> archiveFuture = jobManagerGateway.ask(JobManagerMessages.getRequestArchive(), timeout); + + ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(archiveFuture, timeout)).actor(); + + AkkaActorGateway archiveGateway = new AkkaActorGateway(archive, null); + + Deadline deadline = timeout.fromNow(); + + while (deadline.hasTimeLeft()) { + JobManagerMessages.JobStatusResponse resp = JobManagerActorTestUtils + .requestJobStatus(jobId, archiveGateway, deadline.timeLeft()); + + if (resp instanceof JobManagerMessages.JobNotFound) { + Thread.sleep(100); + } + else { + return; + } + } + } + + private JobStatus requestJobStatus( + JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) + throws Exception { + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null); + + JobManagerMessages.JobStatusResponse resp = JobManagerActorTestUtils + .requestJobStatus(jobId, jobManagerGateway, timeout); + + if (resp instanceof JobManagerMessages.CurrentJobStatus) { + JobManagerMessages.CurrentJobStatus jobStatusResponse = (JobManagerMessages + .CurrentJobStatus) resp; + + return jobStatusResponse.status(); + } + else if (resp instanceof JobManagerMessages.JobNotFound) { + return JobStatus.RESTARTING; + } + + throw new IllegalStateException("Unexpected response from JobManager"); + } + + private void waitForJobRunning( + JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) + throws Exception { + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null); + + JobManagerActorTestUtils.waitForJobStatus(jobId, JobStatus.RUNNING, jobManagerGateway, timeout); + } + + private void waitForJobFinished( + JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) + throws Exception { + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null); + + JobManagerActorTestUtils.waitForJobStatus(jobId, JobStatus.FINISHED, jobManagerGateway, timeout); + } + + private void waitForTaskManagers( + int minimumNumberOfTaskManagers, + JobManagerProcess jobManager, + ActorSystem actorSystem, + FiniteDuration timeout) throws Exception { + + LOG.info("Waiting for {} task managers to connect to leading {}", + minimumNumberOfTaskManagers, jobManager); + + ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout); + AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null); + + JobManagerActorTestUtils.waitForTaskManagers( + minimumNumberOfTaskManagers, jobManagerGateway, timeout); + + LOG.info("All task managers connected"); + } + + private int waitForNewLeader( + TestingListener leaderListener, + List<JobManagerProcess> jobManagerProcesses, + FiniteDuration timeout) throws Exception { + + LOG.info("Waiting for new leader notification"); + leaderListener.waitForNewLeader(timeout.toMillis()); + + LOG.info("Leader: {}:{}", leaderListener.getAddress(), leaderListener.getLeaderSessionID()); + + String currentLeader = leaderListener.getAddress(); + + int leaderIndex = -1; + + for (int i = 0; i < jobManagerProcesses.size(); i++) { + JobManagerProcess jobManager = jobManagerProcesses.get(i); + if (jobManager.getJobManagerAkkaURL().equals(currentLeader)) { + leaderIndex = i; + break; + } + } + + if (leaderIndex == -1) { + throw new IllegalStateException("Failed to determine which process is leader"); + } + + return leaderIndex; + } + + private JobManagerProcess createAndStartJobManagerProcess(Configuration config) + throws Exception { + + JobManagerProcess jobManager = new JobManagerProcess(jobManagerPid++, config); + jobManager.createAndStart(); + LOG.info("Created and started {}.", jobManager); + + return jobManager; + } + + private TaskManagerProcess createAndStartTaskManagerProcess(Configuration config) + throws Exception { + + TaskManagerProcess taskManager = new TaskManagerProcess(taskManagerPid++, config); + taskManager.createAndStart(); + LOG.info("Created and started {}.", taskManager); + + return taskManager; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java new file mode 100644 index 0000000..54ddf7e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.StreamingMode; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.leaderelection.TestingListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; +import org.apache.flink.runtime.testutils.JobManagerProcess; +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Some; +import scala.Tuple2; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class JobManagerCheckpointRecoveryITCase { + + private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + + private final static FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final File FileStateBackendBasePath; + + static { + try { + FileStateBackendBasePath = CommonTestUtils.createTempDirectory(); + } + catch (IOException e) { + throw new RuntimeException("Error in test setup. Could not create directory.", e); + } + } + + @AfterClass + public static void tearDown() throws Exception { + ZooKeeper.shutdown(); + + if (FileStateBackendBasePath != null) { + FileUtils.deleteDirectory(FileStateBackendBasePath); + } + } + + @Before + public void cleanUp() throws Exception { + if (FileStateBackendBasePath != null) { + FileUtils.cleanDirectory(FileStateBackendBasePath); + } + + ZooKeeper.deleteAll(); + } + + // --------------------------------------------------------------------------------------------- + + private static final int Parallelism = 8; + + private static final CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2); + + private static final AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism); + + private static final CountDownLatch FinalCountLatch = new CountDownLatch(1); + + private static final AtomicReference<Long> FinalCount = new AtomicReference<>(); + + private static final long LastElement = -1; + + /** + * Simple checkpointed streaming sum. + * + * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) sums up all counts and + * returns it to the main thread via a static variable. We wait until some checkpoints are + * completed and sanity check that the sources recover with an updated state to make sure that + * this test actually tests something. + */ + @Test + public void testCheckpointedStreamingSumProgram() throws Exception { + // Config + final int checkpointingInterval = 200; + final int sequenceEnd = 5000; + final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setParallelism(Parallelism); + env.enableCheckpointing(checkpointingInterval); + + env + .addSource(new CheckpointedSequenceSource(sequenceEnd)) + .addSink(new CountingSink()) + .setParallelism(1); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper + .getConnectString(), FileStateBackendBasePath.getPath()); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); + + ActorSystem testSystem = null; + JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; + LeaderRetrievalService leaderRetrievalService = null; + ActorSystem taskManagerSystem = null; + + try { + final Deadline deadline = TestTimeOut.fromNow(); + + // Test actor system + testSystem = AkkaUtils.createActorSystem(new Configuration(), + new Some<>(new Tuple2<String, Object>("localhost", 0))); + + // The job managers + jobManagerProcess[0] = new JobManagerProcess(0, config); + jobManagerProcess[1] = new JobManagerProcess(1, config); + + jobManagerProcess[0].createAndStart(); + jobManagerProcess[1].createAndStart(); + + // Leader listener + TestingListener leaderListener = new TestingListener(); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(config); + leaderRetrievalService.start(leaderListener); + + // The task manager + taskManagerSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + TaskManager.startTaskManagerComponentsAndActor( + config, taskManagerSystem, "localhost", + Option.<String>empty(), Option.<LeaderRetrievalService>empty(), + false, StreamingMode.STREAMING, TaskManager.class); + + { + // Initial submission + leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); + + String leaderAddress = leaderListener.getAddress(); + UUID leaderId = leaderListener.getLeaderSessionID(); + + // Get the leader ref + ActorRef leaderRef = AkkaUtils.getActorRef( + leaderAddress, testSystem, deadline.timeLeft()); + ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); + + // Submit the job in detached mode + leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); + + JobManagerActorTestUtils.waitForJobStatus( + jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft()); + } + + // Who's the boss? + JobManagerProcess leadingJobManagerProcess; + if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) { + leadingJobManagerProcess = jobManagerProcess[0]; + } + else { + leadingJobManagerProcess = jobManagerProcess[1]; + } + + CompletedCheckpointsLatch.await(); + + // Kill the leading job manager process + leadingJobManagerProcess.destroy(); + + { + // Recovery by the standby JobManager + leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); + + String leaderAddress = leaderListener.getAddress(); + UUID leaderId = leaderListener.getLeaderSessionID(); + + ActorRef leaderRef = AkkaUtils.getActorRef( + leaderAddress, testSystem, deadline.timeLeft()); + ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); + + JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, + leader, deadline.timeLeft()); + } + + // Wait to finish + FinalCountLatch.await(); + + assertEquals(expectedSum, (long) FinalCount.get()); + + for (int i = 0; i < Parallelism; i++) { + assertNotEquals(0, RecoveredStates.get(i)); + } + } + catch (Throwable t) { + // In case of an error, print the job manager process logs. + if (jobManagerProcess[0] != null) { + jobManagerProcess[0].printProcessLog(); + } + + if (jobManagerProcess[1] != null) { + jobManagerProcess[1].printProcessLog(); + } + + t.printStackTrace(); + } + finally { + if (jobManagerProcess[0] != null) { + jobManagerProcess[0].destroy(); + } + + if (jobManagerProcess[1] != null) { + jobManagerProcess[1].destroy(); + } + + if (leaderRetrievalService != null) { + leaderRetrievalService.stop(); + } + + if (taskManagerSystem != null) { + taskManagerSystem.shutdown(); + } + + if (testSystem != null) { + testSystem.shutdown(); + } + } + } + + // --------------------------------------------------------------------------------------------- + + /** + * A checkpointed source, which emits elements from 0 to a configured number. + */ + public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> + implements Checkpointed<Long> { + + private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class); + + private static final long serialVersionUID = 0L; + + private static final CountDownLatch sync = new CountDownLatch(Parallelism); + + private final long end; + + private long current = 0; + + private volatile boolean isRunning = true; + + public CheckpointedSequenceSource(long end) { + checkArgument(end >= 0, "Negative final count"); + this.end = end; + } + + @Override + public void run(SourceContext<Long> ctx) throws Exception { + while (isRunning) { + synchronized (ctx.getCheckpointLock()) { + if (current <= end) { + ctx.collect(current++); + } + else { + ctx.collect(LastElement); + return; + } + } + + // Slow down until some checkpoints are completed + if (sync.getCount() != 0) { + Thread.sleep(100); + } + } + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId); + return current; + } + + @Override + public void restoreState(Long state) { + LOG.debug("Restoring state {}", state); + + // This is necessary to make sure that something is recovered at all. Otherwise it + // might happen that the job is restarted from the beginning. + RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), state); + + sync.countDown(); + + current = state; + } + + @Override + public void cancel() { + isRunning = false; + } + } + + /** + * A checkpointed sink, which sums up its input and notifies the main thread after all inputs + * are exhausted. + */ + public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>, + CheckpointNotifier { + + private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class); + + private static final long serialVersionUID = 1436484290453629091L; + + private long current = 0; + + private int numberOfReceivedLastElements; + + @Override + public void invoke(Long value) throws Exception { + if (value == LastElement) { + numberOfReceivedLastElements++; + + if (numberOfReceivedLastElements == Parallelism) { + FinalCount.set(current); + FinalCountLatch.countDown(); + } + else if (numberOfReceivedLastElements > Parallelism) { + throw new IllegalStateException("Received more elements than parallelism."); + } + } + else { + current += value; + } + } + + @Override + public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + LOG.debug("Snapshotting state {}:{} @ ID {}.", current, numberOfReceivedLastElements, checkpointId); + return this; + } + + @Override + public void restoreState(CountingSink state) { + LOG.debug("Restoring state {}:{}", state.current, state.numberOfReceivedLastElements); + this.current = state.current; + this.numberOfReceivedLastElements = state.numberOfReceivedLastElements; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + LOG.debug("Checkpoint {} completed.", checkpointId); + CompletedCheckpointsLatch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java new file mode 100644 index 0000000..66565dd --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerProcessFailureBatchRecoveryITCase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * Test the recovery of a simple batch program in the case of JobManager process failure. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class JobManagerProcessFailureBatchRecoveryITCase extends AbstractJobManagerProcessFailureRecoveryITCase { + + // -------------------------------------------------------------------------------------------- + // Parametrization (run pipelined and batch) + // -------------------------------------------------------------------------------------------- + + private final ExecutionMode executionMode; + + public JobManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) { + this.executionMode = executionMode; + } + + @Parameterized.Parameters + public static Collection<Object[]> executionMode() { + return Arrays.asList(new Object[][]{ + {ExecutionMode.PIPELINED}, + {ExecutionMode.BATCH}}); + } + + // -------------------------------------------------------------------------------------------- + // Test the program + // -------------------------------------------------------------------------------------------- + + // This is slightly modified copy the task manager process failure program. + @Override + public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( + "leader", 1, config); + env.setParallelism(PARALLELISM); + env.setNumberOfExecutionRetries(1); + env.getConfig().setExecutionMode(executionMode); + env.getConfig().disableSysoutLogging(); + + final long NUM_ELEMENTS = 100000L; + final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS) + // make sure every mapper is involved (no one is skipped because of lazy split assignment) + .rebalance() + // the majority of the behavior is in the MapFunction + .map(new RichMapFunction<Long, Long>() { + + private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); + + private boolean markerCreated = false; + private boolean checkForProceedFile = true; + + @Override + public Long map(Long value) throws Exception { + if (!markerCreated) { + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + AbstractTaskManagerProcessFailureRecoveryTest.touchFile( + new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex)); + markerCreated = true; + } + + // check if the proceed file exists + if (checkForProceedFile) { + if (proceedFile.exists()) { + checkForProceedFile = false; + } + else { + // otherwise wait so that we make slow progress + Thread.sleep(100); + } + } + return value; + } + }) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }) + // The check is done in the mapper, because the client can currently not handle + // job manager losses/reconnects. + .flatMap(new RichFlatMapFunction<Long, Long>() { + @Override + public void flatMap(Long value, Collector<Long> out) throws Exception { + assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, (long) value); + + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + AbstractTaskManagerProcessFailureRecoveryTest.touchFile( + new File(coordinateDir, FINISH_MARKER_FILE_PREFIX + taskIndex)); + } + }); + + result.output(new DiscardingOutputFormat<Long>()); + + env.execute(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java deleted file mode 100644 index f2b8c31..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recovery; - -import org.apache.flink.api.common.ExecutionMode; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.Arrays; -import java.util.Collection; - -import static org.junit.Assert.assertEquals; - -/** - * Test the recovery of a simple batch program in the case of TaskManager process failure. - */ -@SuppressWarnings("serial") -@RunWith(Parameterized.class) -public class ProcessFailureBatchRecoveryITCase extends AbstractProcessFailureRecoveryTest { - - // -------------------------------------------------------------------------------------------- - // Parametrization (run pipelined and batch) - // -------------------------------------------------------------------------------------------- - - private final ExecutionMode executionMode; - - public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) { - this.executionMode = executionMode; - } - - @Parameterized.Parameters - public static Collection<Object[]> executionMode() { - return Arrays.asList(new Object[][]{ - {ExecutionMode.PIPELINED}, - {ExecutionMode.BATCH}}); - } - - // -------------------------------------------------------------------------------------------- - // Test the program - // -------------------------------------------------------------------------------------------- - - @Override - public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); - env.setParallelism(PARALLELISM); - env.setNumberOfExecutionRetries(1); - env.getConfig().setExecutionMode(executionMode); - env.getConfig().disableSysoutLogging(); - - final long NUM_ELEMENTS = 100000L; - final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS) - - // make sure every mapper is involved (no one is skipped because of lazy split assignment) - .rebalance() - // the majority of the behavior is in the MapFunction - .map(new RichMapFunction<Long, Long>() { - - private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); - - private boolean markerCreated = false; - private boolean checkForProceedFile = true; - - @Override - public Long map(Long value) throws Exception { - if (!markerCreated) { - int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); - touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex)); - markerCreated = true; - } - - // check if the proceed file exists - if (checkForProceedFile) { - if (proceedFile.exists()) { - checkForProceedFile = false; - } else { - // otherwise wait so that we make slow progress - Thread.sleep(100); - } - } - return value; - } - }) - .reduce(new ReduceFunction<Long>() { - @Override - public Long reduce(Long value1, Long value2) { - return value1 + value2; - } - }); - - long sum = result.collect().get(0); - assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 945a78c..6dce370 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -115,13 +115,13 @@ public class ProcessFailureCancelingITCase { "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", getCurrentClasspath(), - AbstractProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(), + AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(), String.valueOf(jobManagerPort) }; // start the first two TaskManager processes taskManagerProcess = new ProcessBuilder(command).start(); - new AbstractProcessFailureRecoveryTest.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); + new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); // we wait for the JobManager to have the two TaskManagers available // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java deleted file mode 100644 index 054b321..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.recovery; - -import java.io.File; -import java.io.IOException; -import java.util.UUID; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; - -import org.junit.Assert; - -import static org.junit.Assert.assertTrue; - -/** - * Test for streaming program behaviour in case of TaskManager failure - * based on {@link AbstractProcessFailureRecoveryTest}. - * - * The logic in this test is as follows: - * - The source slowly emits records (every 10 msecs) until the test driver - * gives the "go" for regular execution - * - The "go" is given after the first taskmanager has been killed, so it can only - * happen in the recovery run - * - The mapper must not be slow, because otherwise the checkpoint barrier cannot pass - * the mapper and no checkpoint will be completed before the killing of the first - * TaskManager. - */ -@SuppressWarnings("serial") -public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailureRecoveryTest { - - private static final int DATA_COUNT = 10000; - - @Override - public void testProgram(int jobManagerPort, final File coordinateDir) throws Exception { - - final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), - UUID.randomUUID().toString()); - - assertTrue("Cannot create directory for checkpoints", tempCheckpointDir.mkdirs()); - - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createRemoteEnvironment("localhost", jobManagerPort); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(1); - env.enableCheckpointing(200); - - env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI())); - - DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT)) - // add a non-chained no-op map to test the chain state restore logic - .map(new MapFunction<Long, Long>() { - @Override - public Long map(Long value) throws Exception { - return value; - } - }).startNewChain() - // populate the coordinate directory so we can proceed to TaskManager failure - .map(new Mapper(coordinateDir)); - - //write result to temporary file - result.addSink(new CheckpointedSink(DATA_COUNT)); - - try { - // blocking call until execution is done - env.execute(); - - // TODO: Figure out why this fails when ran with other tests - // Check whether checkpoints have been cleaned up properly - // assertDirectoryEmpty(tempCheckpointDir); - } - finally { - // clean up - if (tempCheckpointDir.exists()) { - FileUtils.deleteDirectory(tempCheckpointDir); - } - } - } - - public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> - implements Checkpointed<Long> { - - private static final long SLEEP_TIME = 50; - - private final File coordinateDir; - private final long end; - - private volatile boolean isRunning = true; - - private long collected; - - public SleepyDurableGenerateSequence(File coordinateDir, long end) { - this.coordinateDir = coordinateDir; - this.end = end; - } - - @Override - public void run(SourceContext<Long> sourceCtx) throws Exception { - final Object checkpointLock = sourceCtx.getCheckpointLock(); - - RuntimeContext runtimeCtx = getRuntimeContext(); - - final long stepSize = runtimeCtx.getNumberOfParallelSubtasks(); - final long congruence = runtimeCtx.getIndexOfThisSubtask(); - final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); - - final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); - boolean checkForProceedFile = true; - - while (isRunning && collected < toCollect) { - // check if the proceed file exists (then we go full speed) - // if not, we always recheck and sleep - if (checkForProceedFile) { - if (proceedFile.exists()) { - checkForProceedFile = false; - } else { - // otherwise wait so that we make slow progress - Thread.sleep(SLEEP_TIME); - } - } - - synchronized (checkpointLock) { - sourceCtx.collect(collected * stepSize + congruence); - collected++; - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) { - return collected; - } - - @Override - public void restoreState(Long state) { - collected = state; - } - } - - public static class Mapper extends RichMapFunction<Long, Long> { - private boolean markerCreated = false; - private File coordinateDir; - - public Mapper(File coordinateDir) { - this.coordinateDir = coordinateDir; - } - - @Override - public Long map(Long value) throws Exception { - if (!markerCreated) { - int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); - touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex)); - markerCreated = true; - } - return value; - } - } - - private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> { - - private long stepSize; - private long congruence; - private long toCollect; - private Long collected = 0L; - private long end; - - public CheckpointedSink(long end) { - this.end = end; - } - - @Override - public void open(Configuration parameters) throws IOException { - stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); - congruence = getRuntimeContext().getIndexOfThisSubtask(); - toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); - } - - @Override - public void invoke(Long value) throws Exception { - long expected = collected * stepSize + congruence; - - Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected)); - - collected++; - - if (collected > toCollect) { - Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect); - } - - } - - @Override - public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return collected; - } - - @Override - public void restoreState(Long state) { - collected = state; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java new file mode 100644 index 0000000..173c8ea --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * Test the recovery of a simple batch program in the case of TaskManager process failure. + */ +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskManagerProcessFailureRecoveryTest { + + // -------------------------------------------------------------------------------------------- + // Parametrization (run pipelined and batch) + // -------------------------------------------------------------------------------------------- + + private final ExecutionMode executionMode; + + public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) { + this.executionMode = executionMode; + } + + @Parameterized.Parameters + public static Collection<Object[]> executionMode() { + return Arrays.asList(new Object[][]{ + {ExecutionMode.PIPELINED}, + {ExecutionMode.BATCH}}); + } + + // -------------------------------------------------------------------------------------------- + // Test the program + // -------------------------------------------------------------------------------------------- + + @Override + public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort); + env.setParallelism(PARALLELISM); + env.setNumberOfExecutionRetries(1); + env.getConfig().setExecutionMode(executionMode); + env.getConfig().disableSysoutLogging(); + + final long NUM_ELEMENTS = 100000L; + final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS) + + // make sure every mapper is involved (no one is skipped because of lazy split assignment) + .rebalance() + // the majority of the behavior is in the MapFunction + .map(new RichMapFunction<Long, Long>() { + + private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); + + private boolean markerCreated = false; + private boolean checkForProceedFile = true; + + @Override + public Long map(Long value) throws Exception { + if (!markerCreated) { + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex)); + markerCreated = true; + } + + // check if the proceed file exists + if (checkForProceedFile) { + if (proceedFile.exists()) { + checkForProceedFile = false; + } else { + // otherwise wait so that we make slow progress + Thread.sleep(100); + } + } + return value; + } + }) + .reduce(new ReduceFunction<Long>() { + @Override + public Long reduce(Long value1, Long value2) { + return value1 + value2; + } + }); + + long sum = result.collect().get(0); + assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, sum); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java new file mode 100644 index 0000000..aa634f0 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.recovery; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.state.filesystem.FsStateBackend; + +import org.junit.Assert; + +import static org.junit.Assert.assertTrue; + +/** + * Test for streaming program behaviour in case of TaskManager failure + * based on {@link AbstractTaskManagerProcessFailureRecoveryTest}. + * + * The logic in this test is as follows: + * - The source slowly emits records (every 10 msecs) until the test driver + * gives the "go" for regular execution + * - The "go" is given after the first taskmanager has been killed, so it can only + * happen in the recovery run + * - The mapper must not be slow, because otherwise the checkpoint barrier cannot pass + * the mapper and no checkpoint will be completed before the killing of the first + * TaskManager. + */ +@SuppressWarnings("serial") +public class TaskManagerProcessFailureStreamingRecoveryITCase extends AbstractTaskManagerProcessFailureRecoveryTest { + + private static final int DATA_COUNT = 10000; + + @Override + public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { + + final File tempCheckpointDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH), + UUID.randomUUID().toString()); + + assertTrue("Cannot create directory for checkpoints", tempCheckpointDir.mkdirs()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createRemoteEnvironment("localhost", jobManagerPort); + env.setParallelism(PARALLELISM); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(1); + env.enableCheckpointing(200); + + env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI())); + + DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT)) + // add a non-chained no-op map to test the chain state restore logic + .map(new MapFunction<Long, Long>() { + @Override + public Long map(Long value) throws Exception { + return value; + } + }).startNewChain() + // populate the coordinate directory so we can proceed to TaskManager failure + .map(new Mapper(coordinateDir)); + + //write result to temporary file + result.addSink(new CheckpointedSink(DATA_COUNT)); + + try { + // blocking call until execution is done + env.execute(); + + // TODO: Figure out why this fails when ran with other tests + // Check whether checkpoints have been cleaned up properly + // assertDirectoryEmpty(tempCheckpointDir); + } + finally { + // clean up + if (tempCheckpointDir.exists()) { + FileUtils.deleteDirectory(tempCheckpointDir); + } + } + } + + public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> + implements Checkpointed<Long> { + + private static final long SLEEP_TIME = 50; + + private final File coordinateDir; + private final long end; + + private volatile boolean isRunning = true; + + private long collected; + + public SleepyDurableGenerateSequence(File coordinateDir, long end) { + this.coordinateDir = coordinateDir; + this.end = end; + } + + @Override + public void run(SourceContext<Long> sourceCtx) throws Exception { + final Object checkpointLock = sourceCtx.getCheckpointLock(); + + RuntimeContext runtimeCtx = getRuntimeContext(); + + final long stepSize = runtimeCtx.getNumberOfParallelSubtasks(); + final long congruence = runtimeCtx.getIndexOfThisSubtask(); + final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); + + final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); + boolean checkForProceedFile = true; + + while (isRunning && collected < toCollect) { + // check if the proceed file exists (then we go full speed) + // if not, we always recheck and sleep + if (checkForProceedFile) { + if (proceedFile.exists()) { + checkForProceedFile = false; + } else { + // otherwise wait so that we make slow progress + Thread.sleep(SLEEP_TIME); + } + } + + synchronized (checkpointLock) { + sourceCtx.collect(collected * stepSize + congruence); + collected++; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) { + return collected; + } + + @Override + public void restoreState(Long state) { + collected = state; + } + } + + public static class Mapper extends RichMapFunction<Long, Long> { + private boolean markerCreated = false; + private File coordinateDir; + + public Mapper(File coordinateDir) { + this.coordinateDir = coordinateDir; + } + + @Override + public Long map(Long value) throws Exception { + if (!markerCreated) { + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex)); + markerCreated = true; + } + return value; + } + } + + private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> { + + private long stepSize; + private long congruence; + private long toCollect; + private Long collected = 0L; + private long end; + + public CheckpointedSink(long end) { + this.end = end; + } + + @Override + public void open(Configuration parameters) throws IOException { + stepSize = getRuntimeContext().getNumberOfParallelSubtasks(); + congruence = getRuntimeContext().getIndexOfThisSubtask(); + toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize); + } + + @Override + public void invoke(Long value) throws Exception { + long expected = collected * stepSize + congruence; + + Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected)); + + collected++; + + if (collected > toCollect) { + Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect); + } + + } + + @Override + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return collected; + } + + @Override + public void restoreState(Long state) { + collected = state; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 6035c45..ed2113a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.test.runtime.leaderelection; import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.PoisonPill; +import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobClient; @@ -39,17 +40,41 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import static org.junit.Assert.*; +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class ZooKeeperLeaderElectionITCase extends TestLogger { private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION(); + private static final File tempDirectory; + + static { + try { + tempDirectory = org.apache.flink.runtime.testutils + .CommonTestUtils.createTempDirectory(); + } + catch (IOException e) { + throw new RuntimeException("Test setup failed", e); + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (tempDirectory != null) { + FileUtils.deleteDirectory(tempDirectory); + } + } + /** * Tests that the TaskManagers successfully register at the new leader once the old leader * is terminated. @@ -64,13 +89,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); + configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); try { cluster.start(); - for(int i = 0; i < numJMs; i++) { + for (int i = 0; i < numJMs; i++) { ActorGateway leadingJM = cluster.getLeaderGateway(timeout); cluster.waitForTaskManagersToBeRegisteredAtJobManager(leadingJM.actor()); @@ -86,7 +113,8 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { cluster.clearLeader(); leadingJM.tell(PoisonPill.getInstance()); } - } finally { + } + finally { cluster.stop(); } } @@ -110,6 +138,13 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); + configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); + configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getPath()); + + // @TODO @tillrohrmann temporary "disable" recovery, because currently the client does + // not need to resubmit a failed job to a new leader. Should we keep this test and + // disable recovery fully or will this be subsumed by the real client changes anyways? + configuration.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, timeout.toString()); Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); @@ -152,7 +187,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { thread.start(); // Kill all JobManager except for two - for(int i = 0; i < numJMs - 2; i++) { + for (int i = 0; i < numJMs - 2; i++) { ActorGateway jm = cluster.getLeaderGateway(timeout); cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor()); @@ -184,17 +219,18 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { thread.join(timeout.toMillis()); - if(thread.isAlive()) { + if (thread.isAlive()) { jobSubmission.finished = true; fail("The job submission thread did not stop (meaning it did not succeeded in" + "executing the test job."); } - } finally { + } + finally { if (clientActorSystem != null) { cluster.shutdownJobClientActorSystem(clientActorSystem); } - if(thread != null && thread.isAlive() && jobSubmission != null) { + if (thread != null && thread.isAlive() && jobSubmission != null) { jobSubmission.finished = true; } cluster.stop(); @@ -219,7 +255,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { @Override public void run() { - while(!finished) { + while (!finished) { try { LeaderRetrievalService lrService = LeaderRetrievalUtils.createLeaderRetrievalService( @@ -240,11 +276,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { getClass().getClassLoader()); finished = true; - } catch (JobExecutionException e) { + } + catch (JobExecutionException e) { // This was expected, so just try again to submit the job - } catch (LeaderRetrievalException e) { + } + catch (LeaderRetrievalException e) { // This can also happen, so just try again to submit the job - } catch (Exception e) { + } + catch (Exception e) { // This was not expected... fail the test case e.printStackTrace(); fail("Caught unexpected exception in job submission test case."); http://git-wip-us.apache.org/repos/asf/flink/blob/73c73e92/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 61eb6a5..4ada21e 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -29,8 +29,9 @@ import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.jobgraph.JobStatus -import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} @@ -88,7 +89,9 @@ class YarnJobManager( delayBetweenRetries: Long, timeout: FiniteDuration, mode: StreamingMode, - leaderElectionService: LeaderElectionService) + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory) extends JobManager( flinkConfiguration, executionContext, @@ -100,7 +103,9 @@ class YarnJobManager( delayBetweenRetries, timeout, mode, - leaderElectionService) { + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory) { import context._ import scala.collection.JavaConverters._
