This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new d0f773e Revert "Revert "[FLINK-10354] Revert "[FLINK-6328] [chkPts]
Don't add savepoints to CompletedCheckpointStore"""
d0f773e is described below
commit d0f773ea2df0022fa62e7bfe391d94bbfe705f7d
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 00:19:14 2018 +0200
Revert "Revert "[FLINK-10354] Revert "[FLINK-6328] [chkPts] Don't add
savepoints to CompletedCheckpointStore"""
This reverts commit 9b5c9e8d3e968c67681e2703a181b3e6a6bb2ad1.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 37 +++----
.../checkpoint/CheckpointCoordinatorTest.java | 115 ++-------------------
2 files changed, 28 insertions(+), 124 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82227cd..1ee2ece 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -838,28 +838,22 @@ public class CheckpointCoordinator {
// the pending checkpoint must be discarded after the
finalization
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
- // TODO: add savepoints to completed checkpoint store
once FLINK-4815 has been completed
- if (!completedCheckpoint.getProperties().isSavepoint())
{
- try {
-
completedCheckpointStore.addCheckpoint(completedCheckpoint);
- } catch (Exception exception) {
- // we failed to store the completed
checkpoint. Let's clean up
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
completedCheckpoint.discardOnFailedStoring();
- } catch (Throwable t) {
- LOG.warn("Could
not properly discard completed checkpoint {} of job {}.",
completedCheckpoint.getCheckpointID(), job, t);
- }
+ try {
+
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+ } catch (Exception exception) {
+ // we failed to store the completed checkpoint.
Let's clean up
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
completedCheckpoint.discardOnFailedStoring();
+ } catch (Throwable t) {
+ LOG.warn("Could not
properly discard completed checkpoint {}.",
completedCheckpoint.getCheckpointID(), t);
}
- });
-
- throw new CheckpointException("Could
not complete the pending checkpoint " + checkpointId + '.', exception);
- }
+ }
+ });
- // drop those pending checkpoints that are at
prior to the completed one
- dropSubsumedCheckpoints(checkpointId);
+ throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.', exception);
}
} finally {
pendingCheckpoints.remove(checkpointId);
@@ -869,6 +863,9 @@ public class CheckpointCoordinator {
rememberRecentCheckpointId(checkpointId);
+ // drop those pending checkpoints that are at prior to the
completed one
+ dropSubsumedCheckpoints(checkpointId);
+
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1b2062a..502b2bf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1493,8 +1493,8 @@ public class CheckpointCoordinatorTest extends TestLogger
{
assertTrue(pending.isDiscarded());
assertTrue(savepointFuture.isDone());
- // the now the savepoint should be completed but not added to
the completed checkpoint store
- assertEquals(0,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ // the now we should have a completed checkpoint
+ assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
// validate that the relevant tasks got a confirmation message
@@ -1509,7 +1509,7 @@ public class CheckpointCoordinatorTest extends TestLogger
{
verify(subtaskState2,
times(1)).registerSharedStates(any(SharedStateRegistry.class));
}
- CompletedCheckpoint success = savepointFuture.get();
+ CompletedCheckpoint success =
coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, success.getJobId());
assertEquals(timestamp, success.getTimestamp());
assertEquals(pending.getCheckpointId(),
success.getCheckpointID());
@@ -1527,9 +1527,9 @@ public class CheckpointCoordinatorTest extends TestLogger
{
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, checkpointIdNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
- CompletedCheckpoint successNew = savepointFuture.get();
+ CompletedCheckpoint successNew =
coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, successNew.getJobId());
assertEquals(timestampNew, successNew.getTimestamp());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1556,7 +1556,7 @@ public class CheckpointCoordinatorTest extends TestLogger
{
* Triggers a savepoint and two checkpoints. The second checkpoint
completes
* and subsumes the first checkpoint, but not the first savepoint. Then
we
* trigger another checkpoint and savepoint. The 2nd savepoint
completes and
- * does neither subsume the last checkpoint nor the first savepoint.
+ * subsumes the last checkpoint, but not the first savepoint.
*/
@Test
public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1613,19 +1613,18 @@ public class CheckpointCoordinatorTest extends
TestLogger {
assertFalse(savepointFuture1.isDone());
assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
- long checkpointId3 = counter.getLast();
assertEquals(2, coord.getNumberOfPendingCheckpoints());
CompletableFuture<CompletedCheckpoint> savepointFuture2 =
coord.triggerSavepoint(timestamp + 4, savepointDir);
long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
- // 2nd savepoint should not subsume the last checkpoint and the
1st savepoint
+ // 2nd savepoint should subsume the last checkpoint, but not
the 1st savepoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, savepointId2));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, savepointId2));
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
- assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(2,
coord.getNumberOfRetainedSuccessfulCheckpoints());
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
assertFalse(savepointFuture1.isDone());
@@ -1635,15 +1634,9 @@ public class CheckpointCoordinatorTest extends
TestLogger {
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, savepointId1));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, savepointId1));
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertTrue(savepointFuture1.isDone());
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, checkpointId3));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, checkpointId3));
-
assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(2,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(3,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertTrue(savepointFuture1.isDone());
}
private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3466,92 +3459,6 @@ public class CheckpointCoordinatorTest extends
TestLogger {
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
}
- /**
- * FLINK-6328
- *
- * Tests that savepoints are not added to the {@link
CompletedCheckpointStore} and,
- * thus, are not subject to job recovery. The reason that we don't want
that (until
- * FLINK-4815 has been finished) is that the lifecycle of savepoints is
not controlled
- * by the {@link CheckpointCoordinator}.
- */
- @Test
- public void testSavepointsAreNotAddedToCompletedCheckpointStore()
throws Exception {
- final JobID jobId = new JobID();
- final ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
- final ExecutionVertex vertex1 =
mockExecutionVertex(executionAttemptId);
- final CompletedCheckpointStore completedCheckpointStore = new
StandaloneCompletedCheckpointStore(1);
- final long checkpointTimestamp1 = 1L;
- final long savepointTimestamp = 2L;
- final long checkpointTimestamp2 = 3L;
- final String savepointDir =
tmpFolder.newFolder().getAbsolutePath();
-
- final StandaloneCheckpointIDCounter checkpointIDCounter = new
StandaloneCheckpointIDCounter();
-
- CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinator(
- jobId,
- 600000L,
- 600000L,
- 0L,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- new ExecutionVertex[]{vertex1},
- new ExecutionVertex[]{vertex1},
- new ExecutionVertex[]{vertex1},
- checkpointIDCounter,
- completedCheckpointStore,
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY);
-
- // trigger a first checkpoint
- assertTrue(
- "Triggering of a checkpoint should work.",
-
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
-
- assertTrue(0 ==
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
- // complete the 1st checkpoint
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- // check that the checkpoint has been completed
- assertTrue(1 ==
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
- // trigger a savepoint --> this should not have any effect on
the CompletedCheckpointStore
- CompletableFuture<CompletedCheckpoint> savepointFuture =
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
-
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- // check that no errors occurred
- final CompletedCheckpoint savepoint = savepointFuture.get();
-
- assertFalse(
- "The savepoint should not have been added to the
completed checkpoint store",
- savepoint.getCheckpointID() ==
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
-
- assertTrue(
- "Triggering of a checkpoint should work.",
-
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
-
- // complete the 2nd checkpoint
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- assertTrue(
- "The latest completed (proper) checkpoint should have
been added to the completed checkpoint store.",
-
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() ==
checkpointIDCounter.getLast());
- }
-
@Test
public void testSharedStateRegistrationOnRestore() throws Exception {