zentol commented on a change in pull request #7514: [FLINK-11349][tests] Port
CoordinatorShutdownTest to new code base
URL: https://github.com/apache/flink/pull/7514#discussion_r248648156
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
##########
@@ -20,59 +20,118 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
+import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
-public class ExecutionGraphCheckpointCoordinatorTest {
+/**
+ * Tests for the interaction between the {@link ExecutionGraph} and the {@link
CheckpointCoordinator}.
+ */
+public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
/**
- * Tests that a shut down checkpoint coordinator calls shutdown on
- * the store and counter.
+ * Tests that the checkpoint coordinator is shut down if the execution
graph
+ * is failed.
*/
@Test
- public void testShutdownCheckpointCoordinator() throws Exception {
- CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
- CompletedCheckpointStore store =
mock(CompletedCheckpointStore.class);
+ public void testShutdownCheckpointCoordinatorOnFailure() throws
Exception {
+ final CompletableFuture<JobStatus> counterShutdownFuture = new
CompletableFuture<>();
+ CheckpointIDCounter counter = new
TestingCheckpointIDCounter(counterShutdownFuture);
+
+ final CompletableFuture<JobStatus> storeShutdownFuture = new
CompletableFuture<>();
+ CompletedCheckpointStore store = new
TestingCompletedCheckpointStore(storeShutdownFuture);
ExecutionGraph graph =
createExecutionGraphAndEnableCheckpointing(counter, store);
+ final CheckpointCoordinator checkpointCoordinator =
graph.getCheckpointCoordinator();
+
+ assertThat(checkpointCoordinator, Matchers.notNullValue());
+ assertThat(checkpointCoordinator.isShutdown(), is(false));
+
graph.failGlobal(new Exception("Test Exception"));
- verify(counter, times(1)).shutdown(JobStatus.FAILED);
- verify(store, times(1)).shutdown(eq(JobStatus.FAILED));
+ assertThat(checkpointCoordinator.isShutdown(), is(true));
+ assertThat(counterShutdownFuture.get(), is(JobStatus.FAILED));
+ assertThat(storeShutdownFuture.get(), is(JobStatus.FAILED));
}
/**
- * Tests that a suspended checkpoint coordinator calls suspend on
- * the store and counter.
+ * Tests that the checkpoint coordinator is shut down if the execution
graph
+ * is suspended.
*/
@Test
- public void testSuspendCheckpointCoordinator() throws Exception {
- CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
- CompletedCheckpointStore store =
mock(CompletedCheckpointStore.class);
+ public void testShutdownCheckpointCoordinatorOnSuspend() throws
Exception {
+ final CompletableFuture<JobStatus> counterShutdownFuture = new
CompletableFuture<>();
+ CheckpointIDCounter counter = new
TestingCheckpointIDCounter(counterShutdownFuture);
+
+ final CompletableFuture<JobStatus> storeShutdownFuture = new
CompletableFuture<>();
+ CompletedCheckpointStore store = new
TestingCompletedCheckpointStore(storeShutdownFuture);
ExecutionGraph graph =
createExecutionGraphAndEnableCheckpointing(counter, store);
+ final CheckpointCoordinator checkpointCoordinator =
graph.getCheckpointCoordinator();
+
+ assertThat(checkpointCoordinator, Matchers.notNullValue());
+ assertThat(checkpointCoordinator.isShutdown(), is(false));
+
graph.suspend(new Exception("Test Exception"));
- // No shutdown
- verify(counter, times(1)).shutdown(eq(JobStatus.SUSPENDED));
- verify(store, times(1)).shutdown(eq(JobStatus.SUSPENDED));
+ assertThat(checkpointCoordinator.isShutdown(), is(true));
+ assertThat(counterShutdownFuture.get(),
is(JobStatus.SUSPENDED));
+ assertThat(storeShutdownFuture.get(), is(JobStatus.SUSPENDED));
+ }
+
+ /**
+ * Tests that the checkpoint coordinator is shut down if the execution
graph
+ * is finished.
+ */
+ @Test
+ public void testShutdownCheckpointCoordinatorOnFinished() throws
Exception {
+ final CompletableFuture<JobStatus> counterShutdownFuture = new
CompletableFuture<>();
+ CheckpointIDCounter counter = new
TestingCheckpointIDCounter(counterShutdownFuture);
+
+ final CompletableFuture<JobStatus> storeShutdownFuture = new
CompletableFuture<>();
+ CompletedCheckpointStore store = new
TestingCompletedCheckpointStore(storeShutdownFuture);
+
+ ExecutionGraph graph =
createExecutionGraphAndEnableCheckpointing(counter, store);
+ final CheckpointCoordinator checkpointCoordinator =
graph.getCheckpointCoordinator();
+
+ assertThat(checkpointCoordinator, Matchers.notNullValue());
+ assertThat(checkpointCoordinator.isShutdown(), is(false));
+
+ graph.scheduleForExecution();
Review comment:
Kind of unrelated: Where have we documented what state transitions are valid?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services