http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java deleted file mode 100644 index 6b4f354..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java +++ /dev/null @@ -1,1119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.PendingCheckpoint; -import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; -import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; -import org.apache.flink.runtime.state.LocalStateHandle; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.util.SerializedValue; -import org.apache.flink.util.TestLogger; -import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for the savepoint coordinator. - */ -public class SavepointCoordinatorTest extends TestLogger { - - // ------------------------------------------------------------------------ - // Trigger and acknowledge - // ------------------------------------------------------------------------ - - /** - * Simple trigger-acknowledge test for a single savepoint. - */ - @Test - public void testSimpleTriggerSavepoint() throws Exception { - JobID jobId = new JobID(); - long checkpointTimeout = 60 * 1000; - long timestamp = 1272635; - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter(); - HeapSavepointStore savepointStore = new HeapSavepointStore(); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - checkpointTimeout, - vertices, - vertices, - vertices, - checkpointIdCounter, - savepointStore); - - // Trigger the savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp); - assertFalse(savepointPathFuture.isCompleted()); - - long checkpointId = checkpointIdCounter.getLastReturnedCount(); - assertEquals(0, checkpointId); - - // Verify send trigger messages - for (ExecutionVertex vertex : vertices) { - verifyTriggerCheckpoint(vertex, checkpointId, timestamp); - } - - PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints() - .get(checkpointId); - - verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId, - timestamp, 0, 2, 0, false, false); - - // Acknowledge tasks - for (ExecutionVertex vertex : vertices) { - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), - checkpointId, createSerializedStateHandle(vertex), 0)); - } - - // The pending checkpoint is completed - assertTrue(pendingCheckpoint.isDiscarded()); - assertEquals(0, coordinator.getSuccessfulCheckpoints().size()); - - // Verify send notify complete messages - for (ExecutionVertex vertex : vertices) { - verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp); - } - - // Verify that the future has been completed - assertTrue(savepointPathFuture.isCompleted()); - String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero()); - - // Verify the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); - verifySavepoint(savepoint, checkpointId, vertices); - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - /** - * This test triggers a checkpoint and then sends a decline checkpoint message from - * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new - * checkpoint is triggered. - */ - @Test - public void testTriggerAndDeclineCheckpointSimple() throws Exception { - JobID jobId = new JobID(); - long checkpointTimeout = 60 * 1000; - long timestamp = 1272635; - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter(); - SavepointStore savepointStore = new HeapSavepointStore(); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - checkpointTimeout, - vertices, - vertices, - vertices, - checkpointIdCounter, - savepointStore); - - // Trigger the savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp); - assertFalse(savepointPathFuture.isCompleted()); - - long checkpointId = checkpointIdCounter.getLastReturnedCount(); - assertEquals(0, checkpointId); - - // Verify send trigger messages - for (ExecutionVertex vertex : vertices) { - verifyTriggerCheckpoint(vertex, checkpointId, timestamp); - } - - PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints() - .get(checkpointId); - - verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId, - timestamp, 0, 2, 0, false, false); - - // Acknowledge and decline tasks - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, createSerializedStateHandle(vertices[0]), 0)); - - coordinator.receiveDeclineMessage(new DeclineCheckpoint( - jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(), - checkpointId, 0)); - - - // The pending checkpoint is completed - assertTrue(pendingCheckpoint.isDiscarded()); - assertEquals(0, coordinator.getSuccessfulCheckpoints().size()); - - // Verify that the future has been completed - assertTrue(savepointPathFuture.isCompleted()); - - try { - Await.result(savepointPathFuture.failed(), FiniteDuration.Zero()); - fail("Did not throw expected exception"); - } catch (Throwable ignored) {} - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - // ------------------------------------------------------------------------ - // Rollback - // ------------------------------------------------------------------------ - - @Test - @SuppressWarnings("unchecked") - public void testSimpleRollbackSavepoint() throws Exception { - JobID jobId = new JobID(); - - ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] { - mockExecutionJobVertex(jobId, new JobVertexID(), 4), - mockExecutionJobVertex(jobId, new JobVertexID(), 4) }; - - ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices(); - ExecutionVertex[] ackVertices = new ExecutionVertex[8]; - - int i = 0; - for (ExecutionJobVertex jobVertex : jobVertices) { - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { - ackVertices[i++] = vertex; - } - } - - MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter(); - HeapSavepointStore savepointStore = new HeapSavepointStore(); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - triggerVertices, - ackVertices, - new ExecutionVertex[] {}, - idCounter, - savepointStore); - - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123); - - // Acknowledge all tasks - for (ExecutionVertex vertex : ackVertices) { - ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, attemptId, 0, createSerializedStateHandle(vertex), 0)); - } - - String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero()); - assertNotNull(savepointPath); - - // Rollback - coordinator.restoreSavepoint(createExecutionJobVertexMap(jobVertices), savepointPath); - - // Verify all executions have been reset - for (ExecutionVertex vertex : ackVertices) { - verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState( - any(SerializedValue.class), any(Map.class)); - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - // Verify checkpoint ID counter started - assertTrue(idCounter.isStarted()); - - coordinator.shutdown(); - } - - @Test - public void testRollbackParallelismMismatch() throws Exception { - JobID jobId = new JobID(); - - ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[] { - mockExecutionJobVertex(jobId, new JobVertexID(), 4), - mockExecutionJobVertex(jobId, new JobVertexID(), 4) }; - - ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices(); - ExecutionVertex[] ackVertices = new ExecutionVertex[8]; - - int index = 0; - for (ExecutionJobVertex jobVertex : jobVertices) { - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { - ackVertices[index++] = vertex; - } - } - - HeapSavepointStore savepointStore = new HeapSavepointStore(); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - triggerVertices, - ackVertices, - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - savepointStore); - - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123); - - // Acknowledge all tasks - for (ExecutionVertex vertex : ackVertices) { - ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, attemptId, 0, createSerializedStateHandle(vertex), 0)); - } - - String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero()); - assertNotNull(savepointPath); - - // Change parallelism lower than original (state without matching subtask). The - // other way around (subtask without matching state) is OK. - for (int i = 0; i < jobVertices.length; i++) { - jobVertices[i] = mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 2); - } - - try { - // Rollback - coordinator.restoreSavepoint( - createExecutionJobVertexMap(jobVertices), - savepointPath); - fail("Did not throw expected Exception after rollback with parallelism mismatch."); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testRollbackStateStoreFailure() throws Exception { - JobID jobId = new JobID(); - ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4); - HeapSavepointStore savepointStore = spy(new HeapSavepointStore()); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - jobVertex.getTaskVertices(), - jobVertex.getTaskVertices(), - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - savepointStore); - - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123); - - // Acknowledge all tasks - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { - ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, attemptId, 0, createSerializedStateHandle(vertex), 0)); - } - - String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero()); - assertNotNull(savepointPath); - - // Failure on getState - doThrow(new RuntimeException("TestException")).when(savepointStore).loadSavepoint(anyString()); - - try { - // Rollback - coordinator.restoreSavepoint( - createExecutionJobVertexMap(jobVertex), - savepointPath); - - fail("Did not throw expected Exception after rollback with savepoint store failure."); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testRollbackSetsCheckpointID() throws Exception { - SavepointV0 savepoint = new SavepointV0(12312312L, Collections.<TaskState>emptyList()); - - CheckpointIDCounter checkpointIdCounter = mock(CheckpointIDCounter.class); - - SavepointStore savepointStore = mock(SavepointStore.class); - when(savepointStore.loadSavepoint(anyString())).thenReturn(savepoint); - - SavepointCoordinator coordinator = createSavepointCoordinator( - new JobID(), - 60 * 1000, - new ExecutionVertex[] {}, - new ExecutionVertex[] {}, - new ExecutionVertex[] {}, - checkpointIdCounter, - savepointStore); - - coordinator.restoreSavepoint(createExecutionJobVertexMap(), "any"); - - verify(checkpointIdCounter).setCount(eq(12312312L + 1)); - - coordinator.shutdown(); - } - - // ------------------------------------------------------------------------ - // Savepoint aborts and future notifications - // ------------------------------------------------------------------------ - - @Test - public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception { - JobID jobId = new JobID(); - ExecutionVertex[] triggerVertices = new ExecutionVertex[] { - mock(ExecutionVertex.class), - mock(ExecutionVertex.class) }; - ExecutionVertex[] ackVertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - triggerVertices, - ackVertices, - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - new HeapSavepointStore()); - - // Trigger savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123); - - // Abort the savepoint, because the vertices are not running - assertTrue(savepointPathFuture.isCompleted()); - - try { - Await.result(savepointPathFuture, FiniteDuration.Zero()); - fail("Did not throw expected Exception after shutdown"); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception { - JobID jobId = new JobID(); - ExecutionVertex[] triggerVertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId, ExecutionState.FINISHED) }; - ExecutionVertex[] ackVertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - triggerVertices, - ackVertices, - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - new HeapSavepointStore()); - - // Trigger savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123); - - // Abort the savepoint, because the vertices are not running - assertTrue(savepointPathFuture.isCompleted()); - - try { - Await.result(savepointPathFuture, FiniteDuration.Zero()); - fail("Did not throw expected Exception after shutdown"); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception { - JobID jobId = new JobID(); - ExecutionVertex[] triggerVertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - ExecutionVertex[] ackVertices = new ExecutionVertex[] { - mock(ExecutionVertex.class), - mock(ExecutionVertex.class) }; - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - triggerVertices, - ackVertices, - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - new HeapSavepointStore()); - - // Trigger savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1238123); - - // Abort the savepoint, because the vertices are not running - assertTrue(savepointPathFuture.isCompleted()); - - try { - Await.result(savepointPathFuture, FiniteDuration.Zero()); - fail("Did not throw expected Exception after shutdown"); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testAbortOnCheckpointTimeout() throws Exception { - JobID jobId = new JobID(); - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - ExecutionVertex commitVertex = mockExecutionVertex(jobId); - MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter(); - - long checkpointTimeout = 1000; - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - checkpointTimeout, - vertices, - vertices, - new ExecutionVertex[] { commitVertex }, - checkpointIdCounter, - new HeapSavepointStore()); - - // Trigger the savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(12731273); - assertFalse(savepointPathFuture.isCompleted()); - - long checkpointId = checkpointIdCounter.getLastReturnedCount(); - PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints() - .get(checkpointId); - - assertNotNull("Checkpoint not pending (test race)", pendingCheckpoint); - assertFalse("Checkpoint already discarded (test race)", pendingCheckpoint.isDiscarded()); - - // Wait for savepoint to timeout - Deadline deadline = FiniteDuration.apply(60, "s").fromNow(); - while (deadline.hasTimeLeft() - && !pendingCheckpoint.isDiscarded() - && coordinator.getNumberOfPendingCheckpoints() > 0) { - - Thread.sleep(250); - } - - // Verify discarded - assertTrue("Savepoint not discarded within timeout", pendingCheckpoint.isDiscarded()); - assertEquals(0, coordinator.getNumberOfPendingCheckpoints()); - assertEquals(0, coordinator.getNumberOfRetainedSuccessfulCheckpoints()); - - // No commit for timeout - verify(commitVertex, times(0)).sendMessageToCurrentExecution( - any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class)); - - assertTrue(savepointPathFuture.isCompleted()); - - try { - Await.result(savepointPathFuture, FiniteDuration.Zero()); - fail("Did not throw expected Exception after timeout"); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testAbortSavepointsOnShutdown() throws Exception { - JobID jobId = new JobID(); - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - vertices, - vertices, - vertices, - new MockCheckpointIdCounter(), - new HeapSavepointStore()); - - // Trigger savepoints - List<Future<String>> savepointPathFutures = new ArrayList<>(); - savepointPathFutures.add(coordinator.triggerSavepoint(12731273)); - savepointPathFutures.add(coordinator.triggerSavepoint(12731273 + 123)); - - for (Future<String> future : savepointPathFutures) { - assertFalse(future.isCompleted()); - } - - coordinator.shutdown(); - - // Verify futures failed - for (Future<String> future : savepointPathFutures) { - assertTrue(future.isCompleted()); - - try { - Await.result(future, FiniteDuration.Zero()); - fail("Did not throw expected Exception after shutdown"); - } - catch (Exception ignored) { - } - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - } - - @Test - public void testAbortSavepointOnStateStoreFailure() throws Exception { - JobID jobId = new JobID(); - ExecutionJobVertex jobVertex = mockExecutionJobVertex(jobId, new JobVertexID(), 4); - HeapSavepointStore savepointStore = spy(new HeapSavepointStore()); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - 60 * 1000, - jobVertex.getTaskVertices(), - jobVertex.getTaskVertices(), - new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), - savepointStore); - - // Failure on putState - doThrow(new RuntimeException("TestException")) - .when(savepointStore).storeSavepoint(any(Savepoint.class)); - - Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123); - - // Acknowledge all tasks - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { - ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, attemptId, 0, createSerializedStateHandle(vertex), 0)); - } - - try { - Await.result(savepointPathFuture, FiniteDuration.Zero()); - fail("Did not throw expected Exception after rollback with savepoint store failure."); - } - catch (Exception ignored) { - } - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testAbortSavepointIfSubsumed() throws Exception { - JobID jobId = new JobID(); - long checkpointTimeout = 60 * 1000; - long[] timestamps = new long[] { 1272635, 1272635 + 10 }; - long[] checkpointIds = new long[2]; - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter(); - HeapSavepointStore savepointStore = new HeapSavepointStore(); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - checkpointTimeout, - vertices, - vertices, - vertices, - checkpointIdCounter, - savepointStore); - - // Trigger the savepoints - List<Future<String>> savepointPathFutures = new ArrayList<>(); - - savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0])); - checkpointIds[0] = checkpointIdCounter.getLastReturnedCount(); - - savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1])); - checkpointIds[1] = checkpointIdCounter.getLastReturnedCount(); - - for (Future<String> future : savepointPathFutures) { - assertFalse(future.isCompleted()); - } - - // Verify send trigger messages - for (ExecutionVertex vertex : vertices) { - verifyTriggerCheckpoint(vertex, checkpointIds[0], timestamps[0]); - verifyTriggerCheckpoint(vertex, checkpointIds[1], timestamps[1]); - } - - PendingCheckpoint[] pendingCheckpoints = new PendingCheckpoint[] { - coordinator.getPendingCheckpoints().get(checkpointIds[0]), - coordinator.getPendingCheckpoints().get(checkpointIds[1]) }; - - verifyPendingCheckpoint(pendingCheckpoints[0], jobId, checkpointIds[0], - timestamps[0], 0, 2, 0, false, false); - - verifyPendingCheckpoint(pendingCheckpoints[1], jobId, checkpointIds[1], - timestamps[1], 0, 2, 0, false, false); - - // Acknowledge second checkpoint... - for (ExecutionVertex vertex : vertices) { - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), - checkpointIds[1], createSerializedStateHandle(vertex), 0)); - } - - // ...and one task of first checkpoint - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(), - checkpointIds[0], createSerializedStateHandle(vertices[0]), 0)); - - // The second pending checkpoint is completed and subsumes the first one - assertTrue(pendingCheckpoints[0].isDiscarded()); - assertTrue(pendingCheckpoints[1].isDiscarded()); - assertEquals(0, coordinator.getSuccessfulCheckpoints().size()); - - // Verify send notify complete messages for second checkpoint - for (ExecutionVertex vertex : vertices) { - verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]); - } - - Savepoint[] savepoints = new Savepoint[2]; - String[] savepointPaths = new String[2]; - - // Verify that the futures have both been completed - assertTrue(savepointPathFutures.get(0).isCompleted()); - - try { - savepointPaths[0] = Await.result(savepointPathFutures.get(0), FiniteDuration.Zero()); - fail("Did not throw expected exception"); - } - catch (Exception ignored) { - } - - // Verify the second savepoint - assertTrue(savepointPathFutures.get(1).isCompleted()); - savepointPaths[1] = Await.result(savepointPathFutures.get(1), FiniteDuration.Zero()); - savepoints[1] = savepointStore.loadSavepoint(savepointPaths[1]); - verifySavepoint(savepoints[1], checkpointIds[1], vertices); - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - } - - @Test - public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception { - JobID jobId = new JobID(); - long checkpointTimeout = 60 * 1000; - long timestamp = 1272635; - ExecutionVertex[] vertices = new ExecutionVertex[] { - mockExecutionVertex(jobId), - mockExecutionVertex(jobId) }; - MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter(); - - // Temporary directory for file state backend - final File tmpDir = CommonTestUtils.createTempDirectory(); - - try { - FsSavepointStore savepointStore = new FsSavepointStore(tmpDir.toURI().toString(), "sp-"); - - SavepointCoordinator coordinator = createSavepointCoordinator( - jobId, - checkpointTimeout, - vertices, - vertices, - vertices, - checkpointIdCounter, - savepointStore); - - // Trigger the savepoint - Future<String> savepointPathFuture = coordinator.triggerSavepoint(timestamp); - assertFalse(savepointPathFuture.isCompleted()); - - long checkpointId = checkpointIdCounter.getLastReturnedCount(); - assertEquals(0, checkpointId); - - // Verify send trigger messages - for (ExecutionVertex vertex : vertices) { - verifyTriggerCheckpoint(vertex, checkpointId, timestamp); - } - - PendingCheckpoint pendingCheckpoint = coordinator.getPendingCheckpoints() - .get(checkpointId); - - verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId, - timestamp, 0, 2, 0, false, false); - - // Acknowledge tasks - for (ExecutionVertex vertex : vertices) { - coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint( - jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), - checkpointId, createSerializedStateHandle(vertex), 0)); - } - - // The pending checkpoint is completed - assertTrue(pendingCheckpoint.isDiscarded()); - assertEquals(0, coordinator.getSuccessfulCheckpoints().size()); - - // Verify send notify complete messages - for (ExecutionVertex vertex : vertices) { - verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp); - } - - // Verify that the future has been completed - assertTrue(savepointPathFuture.isCompleted()); - String savepointPath = Await.result(savepointPathFuture, FiniteDuration.Zero()); - - // Verify all promises removed - assertEquals(0, getSavepointPromises(coordinator).size()); - - coordinator.shutdown(); - - // Verify the savepoint is still available - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); - verifySavepoint(savepoint, checkpointId, vertices); - } - finally { - FileUtils.deleteDirectory(tmpDir); - } - } - - // ------------------------------------------------------------------------ - // Test helpers - // ------------------------------------------------------------------------ - - private static SavepointCoordinator createSavepointCoordinator( - JobID jobId, - long checkpointTimeout, - ExecutionVertex[] triggerVertices, - ExecutionVertex[] ackVertices, - ExecutionVertex[] commitVertices, - CheckpointIDCounter checkpointIdCounter, - SavepointStore savepointStore) throws Exception { - - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - return new SavepointCoordinator( - jobId, - checkpointTimeout, - checkpointTimeout, - 42, - triggerVertices, - ackVertices, - commitVertices, - classLoader, - checkpointIdCounter, - savepointStore, - new DisabledCheckpointStatsTracker()); - } - - private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap( - ExecutionJobVertex... jobVertices) { - - Map<JobVertexID, ExecutionJobVertex> jobVertexMap = new HashMap<>(); - - for (ExecutionJobVertex jobVertex : jobVertices) { - jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex); - } - - return jobVertexMap; - } - - private static SerializedValue<StateHandle<?>> createSerializedStateHandle( - ExecutionVertex vertex) throws IOException { - - return new SerializedValue<StateHandle<?>>(new LocalStateHandle<Serializable>( - vertex.getCurrentExecutionAttempt().getAttemptId())); - } - - @SuppressWarnings("unchecked") - private Map<Long, Promise<String>> getSavepointPromises( - SavepointCoordinator coordinator) - throws NoSuchFieldException, IllegalAccessException { - - Field field = SavepointCoordinator.class.getDeclaredField("savepointPromises"); - field.setAccessible(true); - return (Map<Long, Promise<String>>) field.get(coordinator); - } - - // ---- Verification ------------------------------------------------------ - - private static void verifyTriggerCheckpoint( - ExecutionVertex mockExecutionVertex, - long expectedCheckpointId, - long expectedTimestamp) { - - ExecutionAttemptID attemptId = mockExecutionVertex - .getCurrentExecutionAttempt().getAttemptId(); - - TriggerCheckpoint expectedMsg = new TriggerCheckpoint( - mockExecutionVertex.getJobId(), - attemptId, - expectedCheckpointId, - expectedTimestamp); - - verify(mockExecutionVertex).sendMessageToCurrentExecution( - eq(expectedMsg), eq(attemptId)); - } - - private static void verifyNotifyCheckpointComplete( - ExecutionVertex mockExecutionVertex, - long expectedCheckpointId, - long expectedTimestamp) { - - ExecutionAttemptID attemptId = mockExecutionVertex - .getCurrentExecutionAttempt().getAttemptId(); - - NotifyCheckpointComplete expectedMsg = new NotifyCheckpointComplete( - mockExecutionVertex.getJobId(), - attemptId, - expectedCheckpointId, - expectedTimestamp); - - verify(mockExecutionVertex).sendMessageToCurrentExecution( - eq(expectedMsg), eq(attemptId)); - } - - private static void verifyPendingCheckpoint( - PendingCheckpoint checkpoint, - JobID expectedJobId, - long expectedCheckpointId, - long expectedTimestamp, - int expectedNumberOfAcknowledgedTasks, - int expectedNumberOfNonAcknowledgedTasks, - int expectedNumberOfCollectedStates, - boolean expectedIsDiscarded, - boolean expectedIsFullyAcknowledged) { - - assertNotNull(checkpoint); - assertEquals(expectedJobId, checkpoint.getJobId()); - assertEquals(expectedCheckpointId, checkpoint.getCheckpointId()); - assertEquals(expectedTimestamp, checkpoint.getCheckpointTimestamp()); - assertEquals(expectedNumberOfAcknowledgedTasks, checkpoint.getNumberOfAcknowledgedTasks()); - assertEquals(expectedNumberOfNonAcknowledgedTasks, checkpoint.getNumberOfNonAcknowledgedTasks()); - - int actualNumberOfCollectedStates = 0; - - for (TaskState taskState : checkpoint.getTaskStates().values()) { - actualNumberOfCollectedStates += taskState.getNumberCollectedStates(); - } - - assertEquals(expectedNumberOfCollectedStates, actualNumberOfCollectedStates); - assertEquals(expectedIsDiscarded, checkpoint.isDiscarded()); - assertEquals(expectedIsFullyAcknowledged, checkpoint.isFullyAcknowledged()); - } - - private static void verifySavepoint( - Savepoint savepoint, - long expectedCheckpointId, - ExecutionVertex[] expectedVertices) throws Exception { - - assertEquals(expectedCheckpointId, savepoint.getCheckpointId()); - - for (TaskState taskState : savepoint.getTaskStates()) { - JobVertexID jobVertexId = taskState.getJobVertexID(); - - // Find matching execution vertex - ExecutionVertex vertex = null; - for (ExecutionVertex executionVertex : expectedVertices) { - if (executionVertex.getJobvertexId().equals(jobVertexId)) { - vertex = executionVertex; - break; - } - } - - if (vertex == null) { - fail("Did not find matching vertex"); - } else { - SubtaskState subtaskState = taskState.getState(vertex.getParallelSubtaskIndex()); - ExecutionAttemptID vertexAttemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - - ExecutionAttemptID stateAttemptId = (ExecutionAttemptID) subtaskState.getState() - .deserializeValue(Thread.currentThread().getContextClassLoader()) - .getState(Thread.currentThread().getContextClassLoader()); - - assertEquals(vertexAttemptId, stateAttemptId); - } - } - } - - // ---- Mocking ----------------------------------------------------------- - - private static ExecutionJobVertex mockExecutionJobVertex( - JobID jobId, - JobVertexID jobVertexId, - int parallelism) { - - ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); - when(jobVertex.getJobId()).thenReturn(jobId); - when(jobVertex.getJobVertexId()).thenReturn(jobVertexId); - when(jobVertex.getParallelism()).thenReturn(parallelism); - - ExecutionVertex[] vertices = new ExecutionVertex[parallelism]; - - for (int i = 0; i < vertices.length; i++) { - vertices[i] = mockExecutionVertex(jobId, jobVertexId, i, parallelism, ExecutionState.RUNNING); - } - - when(jobVertex.getTaskVertices()).thenReturn(vertices); - - return jobVertex; - } - - private static ExecutionVertex mockExecutionVertex(JobID jobId) { - return mockExecutionVertex(jobId, ExecutionState.RUNNING); - } - - private static ExecutionVertex mockExecutionVertex( - JobID jobId, - ExecutionState state) { - - return mockExecutionVertex(jobId, new JobVertexID(), 0, 1, state); - } - - private static ExecutionVertex mockExecutionVertex( - JobID jobId, - JobVertexID jobVertexId, - int subtaskIndex, - int parallelism, - ExecutionState executionState) { - - Execution exec = mock(Execution.class); - when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID()); - when(exec.getState()).thenReturn(executionState); - - ExecutionVertex vertex = mock(ExecutionVertex.class); - when(vertex.getJobId()).thenReturn(jobId); - when(vertex.getJobvertexId()).thenReturn(jobVertexId); - when(vertex.getParallelSubtaskIndex()).thenReturn(subtaskIndex); - when(vertex.getCurrentExecutionAttempt()).thenReturn(exec); - when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism); - - return vertex; - } - - private static class MockCheckpointIdCounter implements CheckpointIDCounter { - - private boolean started; - private long count; - private long lastReturnedCount; - - @Override - public void start() throws Exception { - started = true; - } - - @Override - public void shutdown() throws Exception { - started = false; - } - - @Override - public void suspend() throws Exception { - started = false; - } - - @Override - public long getAndIncrement() throws Exception { - lastReturnedCount = count; - return count++; - } - - @Override - public void setCount(long newCount) { - count = newCount; - } - - long getLastReturnedCount() { - return lastReturnedCount; - } - - public boolean isStarted() { - return started; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java new file mode 100644 index 0000000..6a85195 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SavepointLoaderTest { + + /** + * Tests loading and validation of savepoints with correct setup, + * parallelism mismatch, and a missing task. + */ + @Test + public void testLoadAndValidateSavepoint() throws Exception { + int parallelism = 128128; + JobVertexID vertexId = new JobVertexID(); + + TaskState state = mock(TaskState.class); + when(state.getParallelism()).thenReturn(parallelism); + when(state.getJobVertexID()).thenReturn(vertexId); + + Map<JobVertexID, TaskState> taskStates = new HashMap<>(); + taskStates.put(vertexId, state); + + CompletedCheckpoint stored = new CompletedCheckpoint( + new JobID(), + Integer.MAX_VALUE + 123123L, + 10200202, + 1020292988, + taskStates, + true); + + // Store savepoint + SavepointV0 savepoint = new SavepointV0(stored.getCheckpointID(), taskStates.values()); + SavepointStore store = new HeapSavepointStore(); + String path = store.storeSavepoint(savepoint); + + JobID jobId = new JobID(); + + ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); + when(vertex.getParallelism()).thenReturn(parallelism); + + Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); + tasks.put(vertexId, vertex); + + // 1) Load and validate: everything correct + CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path); + + assertEquals(jobId, loaded.getJobId()); + assertEquals(stored.getCheckpointID(), loaded.getCheckpointID()); + + // The loaded checkpoint should not discard state when its discarded + loaded.discard(ClassLoader.getSystemClassLoader()); + verify(state, times(0)).discard(any(ClassLoader.class)); + + // 2) Load and validate: parallelism mismatch + when(vertex.getParallelism()).thenReturn(222); + + try { + SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path); + fail("Did not throw expected Exception"); + } catch (IllegalStateException expected) { + assertTrue(expected.getMessage().contains("Parallelism mismatch")); + } + + // 3) Load and validate: missing vertex (this should be relaxed) + assertNotNull(tasks.remove(vertexId)); + + try { + SavepointLoader.loadAndValidateSavepoint(jobId, tasks, store, path); + fail("Did not throw expected Exception"); + } catch (IllegalStateException expected) { + assertTrue(expected.getMessage().contains("Cannot map old state")); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index 9265ab1..12bbf82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.util.SerializedValue; @@ -340,8 +340,7 @@ public class SimpleCheckpointStatsTrackerTest { // Add some random delay final long completionTimestamp = triggerTimestamp + completionDuration + RAND.nextInt(10); - checkpoints[i] = new CompletedCheckpoint( - jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates); + checkpoints[i] = new CompletedCheckpoint(jobId, i, triggerTimestamp, completionTimestamp, taskGroupStates, true); } return checkpoints; http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index a576a58..548bef0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -22,7 +22,7 @@ package org.apache.flink.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout -import org.apache.flink.api.common.{ExecutionConfig, JobID} +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.checkpoint.CheckpointCoordinator import org.apache.flink.runtime.client.JobExecutionException @@ -31,10 +31,8 @@ import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVert import org.apache.flink.runtime.jobmanager.Tasks._ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup} import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils import org.junit.runner.RunWith import org.mockito.Mockito._ import org.scalatest.junit.JUnitRunner @@ -829,14 +827,14 @@ class JobManagerITCase(_system: ActorSystem) deadline.timeLeft).executionGraph // Mock the checkpoint coordinator - val savepointCoordinator = mock(classOf[SavepointCoordinator]) + val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) doThrow(new Exception("Expected Test Exception")) - .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) + .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) // Update the savepoint coordinator field - val field = executionGraph.getClass.getDeclaredField("savepointCoordinator") + val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator") field.setAccessible(true) - field.set(executionGraph, savepointCoordinator) + field.set(executionGraph, checkpointCoordinator) // Trigger savepoint for job jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor) @@ -877,10 +875,12 @@ class JobManagerITCase(_system: ActorSystem) expectMsg(JobSubmitSuccess(jobGraph.getJobID())) // Mock the checkpoint coordinator - val savepointCoordinator = mock(classOf[SavepointCoordinator]) + val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) + doThrow(new Exception("Expected Test Exception")) + .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) val savepointPathPromise = scala.concurrent.promise[String] doReturn(savepointPathPromise.future) - .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) + .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) // Request the execution graph and set a checkpoint coordinator mock jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor) @@ -888,9 +888,9 @@ class JobManagerITCase(_system: ActorSystem) deadline.timeLeft).executionGraph // Update the savepoint coordinator field - val field = executionGraph.getClass.getDeclaredField("savepointCoordinator") + val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator") field.setAccessible(true) - field.set(executionGraph, savepointCoordinator) + field.set(executionGraph, checkpointCoordinator) // Trigger savepoint for job jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor) @@ -935,10 +935,12 @@ class JobManagerITCase(_system: ActorSystem) expectMsg(JobSubmitSuccess(jobGraph.getJobID())) // Mock the checkpoint coordinator - val savepointCoordinator = mock(classOf[SavepointCoordinator]) + val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) + doThrow(new Exception("Expected Test Exception")) + .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) val savepointPathPromise = scala.concurrent.promise[String] doReturn(savepointPathPromise.future) - .when(savepointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) + .when(checkpointCoordinator).triggerSavepoint(org.mockito.Matchers.anyLong()) // Request the execution graph and set a checkpoint coordinator mock jobManager.tell(RequestExecutionGraph(jobGraph.getJobID), testActor) @@ -946,9 +948,9 @@ class JobManagerITCase(_system: ActorSystem) deadline.timeLeft).executionGraph // Update the savepoint coordinator field - val field = executionGraph.getClass.getDeclaredField("savepointCoordinator") + val field = executionGraph.getClass.getDeclaredField("checkpointCoordinator") field.setAccessible(true) - field.set(executionGraph, savepointCoordinator) + field.set(executionGraph, checkpointCoordinator) // Trigger savepoint for job jobManager.tell(TriggerSavepoint(jobGraph.getJobID()), testActor) http://git-wip-us.apache.org/repos/asf/flink/blob/47acdead/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 0ed28ad..4fc310c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -403,148 +403,6 @@ public class SavepointITCase extends TestLogger { } /** - * Tests that removed checkpoint files which are part of a savepoint throw - * a proper Exception on submission. - */ - @Test - @RetryOnFailure(times = 2) - public void testCheckpointHasBeenRemoved() throws Exception { - // Config - int numTaskManagers = 2; - int numSlotsPerTaskManager = 2; - int parallelism = numTaskManagers * numSlotsPerTaskManager; - - // Test deadline - final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); - - // The number of checkpoints to complete before triggering the savepoint - final int numberOfCompletedCheckpoints = 10; - - // Temporary directory for file state backend - final File tmpDir = CommonTestUtils.createTempDirectory(); - - LOG.info("Created temporary directory: " + tmpDir + "."); - - ForkableFlinkMiniCluster flink = null; - - try { - // Flink configuration - final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); - - final File checkpointDir = new File(tmpDir, "checkpoints"); - final File savepointDir = new File(tmpDir, "savepoints"); - - if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) { - fail("Test setup failed: failed to create temporary directories."); - } - - LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); - LOG.info("Created temporary savepoint directory: " + savepointDir + "."); - - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - - config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, - checkpointDir.toURI().toString()); - config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, - savepointDir.toURI().toString()); - - LOG.info("Flink configuration: " + config + "."); - - // Start Flink - flink = new ForkableFlinkMiniCluster(config); - LOG.info("Starting Flink cluster."); - flink.start(); - - // Retrieve the job manager - LOG.info("Retrieving JobManager."); - ActorGateway jobManager = Await.result( - flink.leaderGateway().future(), - deadline.timeLeft()); - LOG.info("JobManager: " + jobManager + "."); - - // Submit the job - final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000); - final JobID jobId = jobGraph.getJobID(); - - // Wait for the source to be notified about the expected number - // of completed checkpoints - InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch( - numberOfCompletedCheckpoints); - - LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode."); - - flink.submitJobDetached(jobGraph); - - LOG.info("Waiting for " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); - - // Wait... - InfiniteTestSource.CheckpointCompleteLatch.await(); - - LOG.info("Received all " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); - - // ...and then trigger the savepoint - LOG.info("Triggering a savepoint."); - - Future<Object> savepointPathFuture = jobManager.ask( - new TriggerSavepoint(jobId), deadline.timeLeft()); - - final String savepointPath = ((TriggerSavepointSuccess) Await - .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); - LOG.info("Retrieved savepoint path: " + savepointPath + "."); - - // Retrieve the savepoint from the testing job manager - LOG.info("Requesting the savepoint."); - Future<Object> savepointFuture = jobManager.ask( - new RequestSavepoint(savepointPath), - deadline.timeLeft()); - - Await.ready(savepointFuture, deadline.timeLeft()); - LOG.info("Retrieved savepoint: " + savepointPath + "."); - - // Shut down the Flink cluster (thereby canceling the job) - LOG.info("Shutting down Flink cluster."); - flink.shutdown(); - - // Remove the checkpoint files - try { - FileUtils.deleteDirectory(checkpointDir); - } catch (FileNotFoundException ignored) { - } - - // Restart the cluster - LOG.info("Restarting Flink cluster."); - flink.start(); - - // Set the savepoint path - jobGraph.setSavepointPath(savepointPath); - - LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " + - "savepoint path " + savepointPath + " in detached mode."); - - try { - flink.submitJobAndWait(jobGraph, false, deadline.timeLeft()); - fail("Did not throw expected Exception because of missing checkpoint files"); - } - catch (Exception ignored) { - } - } - finally { - if (flink != null) { - flink.shutdown(); - } - - if (tmpDir != null) { - FileUtils.deleteDirectory(tmpDir); - } - } - } - - /** * Tests that a job manager backed savepoint is removed when the checkpoint * coordinator is shut down, because the associated checkpoints files will * linger around otherwise. @@ -645,40 +503,42 @@ public class SavepointITCase extends TestLogger { savepointFuture, deadline.timeLeft())).savepoint(); LOG.info("Retrieved savepoint: " + savepointPath + "."); - // Cancel the job - LOG.info("Cancelling job " + jobId + "."); - Future<Object> cancelRespFuture = jobManager.ask( - new CancelJob(jobId), deadline.timeLeft()); - assertTrue(Await.result(cancelRespFuture, deadline.timeLeft()) - instanceof CancellationSuccess); - - LOG.info("Waiting for job " + jobId + " to be removed."); - Future<Object> removedRespFuture = jobManager.ask( - new NotifyWhenJobRemoved(jobId), deadline.timeLeft()); - assertTrue((Boolean) Await.result(removedRespFuture, deadline.timeLeft())); - // Check that all checkpoint files have been removed for (TaskState stateForTaskGroup : savepoint.getTaskStates()) { for (SubtaskState subtaskState : stateForTaskGroup.getStates()) { StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState() - .deserializeValue(ClassLoader.getSystemClassLoader()); + .deserializeValue(ClassLoader.getSystemClassLoader()); for (StreamTaskState taskState : taskStateList.getState( - ClassLoader.getSystemClassLoader())) { + ClassLoader.getSystemClassLoader())) { AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState(); checkpointFiles.add(new File(fsState.getFilePath().toUri())); } } } + + // Cancel the job + LOG.info("Cancelling job " + jobId + "."); + Future<Object> cancelRespFuture = jobManager.ask( + new CancelJob(jobId), deadline.timeLeft()); + assertTrue(Await.result(cancelRespFuture, deadline.timeLeft()) + instanceof CancellationSuccess); + + LOG.info("Waiting for job " + jobId + " to be removed."); + Future<Object> removedRespFuture = jobManager.ask( + new NotifyWhenJobRemoved(jobId), deadline.timeLeft()); + assertTrue((Boolean) Await.result(removedRespFuture, deadline.timeLeft())); } finally { if (flink != null) { flink.shutdown(); } + Thread.sleep(1000); + // At least one checkpoint file - assertTrue(checkpointFiles.size() > 0); + assertTrue(checkpointFiles.toString(), checkpointFiles.size() > 0); // The checkpoint associated with the savepoint should have been // discarded after shutdown @@ -753,121 +613,6 @@ public class SavepointITCase extends TestLogger { } } - /** - * Tests that a restore failure is retried with the savepoint state. - */ - @Test - public void testRestoreFailure() throws Exception { - // Config - int numTaskManagers = 1; - int numSlotsPerTaskManager = 1; - int numExecutionRetries = 2; - int retryDelay = 500; - int checkpointingInterval = 100000000; - - // Test deadline - final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); - - ForkableFlinkMiniCluster flink = null; - - try { - // The job - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.enableCheckpointing(checkpointingInterval); - env.setNumberOfExecutionRetries(numExecutionRetries); - env.getConfig().setExecutionRetryDelay(retryDelay); - - DataStream<Integer> stream = env - .addSource(new RestoreStateCountingAndFailingSource()); - - // Source configuration - RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false; - RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0; - RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1); - RestoreStateCountingAndFailingSource.emitted= 0; - - stream.addSink(new DiscardingSink<Integer>()); - - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - - // Flink configuration - final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); - LOG.info("Flink configuration: " + config + "."); - - // Start Flink - flink = new ForkableFlinkMiniCluster(config); - LOG.info("Starting Flink cluster."); - flink.start(); - - // Retrieve the job manager - LOG.info("Retrieving JobManager."); - ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft()); - LOG.info("JobManager: " + jobManager + "."); - - // Submit the job and wait for some checkpoints to complete - flink.submitJobDetached(jobGraph); - - while (deadline.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted < 100) { - Thread.sleep(100); - } - - assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100); - - // Trigger the savepoint - Future<Object> savepointPathFuture = jobManager.ask( - new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft()); - - Object resp = Await.result(savepointPathFuture, deadline.timeLeft()); - - String savepointPath = null; - if (resp instanceof TriggerSavepointSuccess) { - savepointPath = ((TriggerSavepointSuccess) resp).savepointPath(); - LOG.info("Retrieved savepoint path: " + savepointPath + "."); - } else if (resp instanceof TriggerSavepointFailure) { - fail("Received TriggerSavepointFailure: " + ((TriggerSavepointFailure) resp).cause().getMessage()); - } else { - fail("Unexpected response of type " + resp.getClass() + " " + resp); - } - - // Completed checkpoint - RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await(); - - // Cancel the job - Future<?> cancelFuture = jobManager.ask(new CancelJob( - jobGraph.getJobID()), deadline.timeLeft()); - Await.ready(cancelFuture, deadline.timeLeft()); - - // Wait for the job to be removed - Future<?> removedFuture = jobManager.ask(new NotifyWhenJobRemoved( - jobGraph.getJobID()), deadline.timeLeft()); - Await.ready(removedFuture, deadline.timeLeft()); - - // Set source to fail on restore calls and try to recover from savepoint - RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true; - jobGraph.setSavepointPath(savepointPath); - - try { - flink.submitJobAndWait(jobGraph, false, deadline.timeLeft()); - // If the savepoint state is not restored, we will wait here - // until the deadline times out. - fail("Did not throw expected Exception"); - } catch (Exception ignored) { - } finally { - // Expecting one restore for the initial submission from - // savepoint and one for the execution retries - assertEquals(1 + numExecutionRetries, RestoreStateCountingAndFailingSource.numRestoreStateCalls); - } - } - finally { - if (flink != null) { - flink.shutdown(); - } - } - } - // ------------------------------------------------------------------------ // Test program // ------------------------------------------------------------------------ @@ -982,6 +727,10 @@ public class SavepointITCase extends TestLogger { while (running) { ctx.collect(1); emitted++; + + if (failOnRestoreStateCall) { + throw new RuntimeException("Restore test failure"); + } } } @@ -998,10 +747,6 @@ public class SavepointITCase extends TestLogger { @Override public void restoreState(Serializable state) throws Exception { numRestoreStateCalls++; - - if (failOnRestoreStateCall) { - throw new RuntimeException("Restore test failure"); - } } @Override
