This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new d0f773e  Revert "Revert "[FLINK-10354] Revert "[FLINK-6328] [chkPts] 
Don't add savepoints to CompletedCheckpointStore"""
d0f773e is described below

commit d0f773ea2df0022fa62e7bfe391d94bbfe705f7d
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Oct 17 00:19:14 2018 +0200

    Revert "Revert "[FLINK-10354] Revert "[FLINK-6328] [chkPts] Don't add 
savepoints to CompletedCheckpointStore"""
    
    This reverts commit 9b5c9e8d3e968c67681e2703a181b3e6a6bb2ad1.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  37 +++----
 .../checkpoint/CheckpointCoordinatorTest.java      | 115 ++-------------------
 2 files changed, 28 insertions(+), 124 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82227cd..1ee2ece 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -838,28 +838,22 @@ public class CheckpointCoordinator {
                        // the pending checkpoint must be discarded after the 
finalization
                        
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
 
-                       // TODO: add savepoints to completed checkpoint store 
once FLINK-4815 has been completed
-                       if (!completedCheckpoint.getProperties().isSavepoint()) 
{
-                               try {
-                                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
-                               } catch (Exception exception) {
-                                       // we failed to store the completed 
checkpoint. Let's clean up
-                                       executor.execute(new Runnable() {
-                                               @Override
-                                               public void run() {
-                                                       try {
-                                                               
completedCheckpoint.discardOnFailedStoring();
-                                                       } catch (Throwable t) {
-                                                               LOG.warn("Could 
not properly discard completed checkpoint {} of job {}.", 
completedCheckpoint.getCheckpointID(), job, t);
-                                                       }
+                       try {
+                               
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+                       } catch (Exception exception) {
+                               // we failed to store the completed checkpoint. 
Let's clean up
+                               executor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       
completedCheckpoint.discardOnFailedStoring();
+                                               } catch (Throwable t) {
+                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), t);
                                                }
-                                       });
-
-                                       throw new CheckpointException("Could 
not complete the pending checkpoint " + checkpointId + '.', exception);
-                               }
+                                       }
+                               });
 
-                               // drop those pending checkpoints that are at 
prior to the completed one
-                               dropSubsumedCheckpoints(checkpointId);
+                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.', exception);
                        }
                } finally {
                        pendingCheckpoints.remove(checkpointId);
@@ -869,6 +863,9 @@ public class CheckpointCoordinator {
 
                rememberRecentCheckpointId(checkpointId);
 
+               // drop those pending checkpoints that are at prior to the 
completed one
+               dropSubsumedCheckpoints(checkpointId);
+
                // record the time when this was completed, to calculate
                // the 'min delay between checkpoints'
                lastCheckpointCompletionNanos = System.nanoTime();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1b2062a..502b2bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1493,8 +1493,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                assertTrue(pending.isDiscarded());
                assertTrue(savepointFuture.isDone());
 
-               // the now the savepoint should be completed but not added to 
the completed checkpoint store
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               // the now we should have a completed checkpoint
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
                // validate that the relevant tasks got a confirmation message
@@ -1509,7 +1509,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        verify(subtaskState2, 
times(1)).registerSharedStates(any(SharedStateRegistry.class));
                }
 
-               CompletedCheckpoint success = savepointFuture.get();
+               CompletedCheckpoint success = 
coord.getSuccessfulCheckpoints().get(0);
                assertEquals(jid, success.getJobId());
                assertEquals(timestamp, success.getTimestamp());
                assertEquals(pending.getCheckpointId(), 
success.getCheckpointID());
@@ -1527,9 +1527,9 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointIdNew));
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-               CompletedCheckpoint successNew = savepointFuture.get();
+               CompletedCheckpoint successNew = 
coord.getSuccessfulCheckpoints().get(0);
                assertEquals(jid, successNew.getJobId());
                assertEquals(timestampNew, successNew.getTimestamp());
                assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1556,7 +1556,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
         * Triggers a savepoint and two checkpoints. The second checkpoint 
completes
         * and subsumes the first checkpoint, but not the first savepoint. Then 
we
         * trigger another checkpoint and savepoint. The 2nd savepoint 
completes and
-        * does neither subsume the last checkpoint nor the first savepoint.
+        * subsumes the last checkpoint, but not the first savepoint.
         */
        @Test
        public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1613,19 +1613,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                assertFalse(savepointFuture1.isDone());
 
                assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
-               long checkpointId3 = counter.getLast();
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
                CompletableFuture<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
                long savepointId2 = counter.getLast();
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
-               // 2nd savepoint should not subsume the last checkpoint and the 
1st savepoint
+               // 2nd savepoint should subsume the last checkpoint, but not 
the 1st savepoint
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId2));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId2));
 
-               assertEquals(2, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
                assertFalse(savepointFuture1.isDone());
@@ -1635,15 +1634,9 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, savepointId1));
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, savepointId1));
 
-               assertEquals(1, coord.getNumberOfPendingCheckpoints());
-               assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-               assertTrue(savepointFuture1.isDone());
-
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointId3));
-               coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointId3));
-
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
-               assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+               assertTrue(savepointFuture1.isDone());
        }
 
        private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3466,92 +3459,6 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
        }
 
-       /**
-        * FLINK-6328
-        *
-        * Tests that savepoints are not added to the {@link 
CompletedCheckpointStore} and,
-        * thus, are not subject to job recovery. The reason that we don't want 
that (until
-        * FLINK-4815 has been finished) is that the lifecycle of savepoints is 
not controlled
-        * by the {@link CheckpointCoordinator}.
-        */
-       @Test
-       public void testSavepointsAreNotAddedToCompletedCheckpointStore() 
throws Exception {
-               final JobID jobId = new JobID();
-               final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
-               final ExecutionVertex vertex1 = 
mockExecutionVertex(executionAttemptId);
-               final CompletedCheckpointStore completedCheckpointStore = new 
StandaloneCompletedCheckpointStore(1);
-               final long checkpointTimestamp1 = 1L;
-               final long savepointTimestamp = 2L;
-               final long checkpointTimestamp2 = 3L;
-               final String savepointDir = 
tmpFolder.newFolder().getAbsolutePath();
-
-               final StandaloneCheckpointIDCounter checkpointIDCounter = new 
StandaloneCheckpointIDCounter();
-
-               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinator(
-                       jobId,
-                       600000L,
-                       600000L,
-                       0L,
-                       Integer.MAX_VALUE,
-                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                       new ExecutionVertex[]{vertex1},
-                       new ExecutionVertex[]{vertex1},
-                       new ExecutionVertex[]{vertex1},
-                       checkpointIDCounter,
-                       completedCheckpointStore,
-                       new MemoryStateBackend(),
-                       Executors.directExecutor(),
-                       SharedStateRegistry.DEFAULT_FACTORY);
-
-               // trigger a first checkpoint
-               assertTrue(
-                       "Triggering of a checkpoint should work.",
-                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
-
-               assertTrue(0 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
-               // complete the 1st checkpoint
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               // check that the checkpoint has been completed
-               assertTrue(1 == 
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
-               // trigger a savepoint --> this should not have any effect on 
the CompletedCheckpointStore
-               CompletableFuture<CompletedCheckpoint> savepointFuture = 
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
-
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               // check that no errors occurred
-               final CompletedCheckpoint savepoint = savepointFuture.get();
-
-               assertFalse(
-                       "The savepoint should not have been added to the 
completed checkpoint store",
-                       savepoint.getCheckpointID() == 
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
-
-               assertTrue(
-                       "Triggering of a checkpoint should work.",
-                       
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
-
-               // complete the 2nd checkpoint
-               checkpointCoordinator.receiveAcknowledgeMessage(
-                       new AcknowledgeCheckpoint(
-                               jobId,
-                               executionAttemptId,
-                               checkpointIDCounter.getLast()));
-
-               assertTrue(
-                       "The latest completed (proper) checkpoint should have 
been added to the completed checkpoint store.",
-                       
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
-       }
-
        @Test
        public void testSharedStateRegistrationOnRestore() throws Exception {
 

Reply via email to