[
https://issues.apache.org/jira/browse/FLINK-10354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638099#comment-16638099
]
ASF GitHub Bot commented on FLINK-10354:
----------------------------------------
dawidwys closed pull request #6704: [FLINK-10354] Revert "[FLINK-6328] [chkPts]
Don't add savepoints to CompletedCheckpointStore"
URL: https://github.com/apache/flink/pull/6704
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index 6dd5154c5e6..d31063ee2c5 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -106,6 +106,11 @@ Please follow <a
href="https://issues.apache.org/jira/browse/FLINK-5778">FLINK-5
Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state
will be stored in the `_metadata` file. Since it is self-contained, you may
move the file and restore from any location.
+<div class="alert alert-warning">
+ <strong>Attention:</strong> It is discouraged to move or delete last
savepoint of a running job, cause this might interfere with failure-recovery.
Savepoints have side-effects on exactly-once sinks, therefore
+ to ensure exactly-once semantics, if there is no checkpoint after the last
savepoint, the savepoint will be used for recovery.
+</div>
+
#### Trigger a Savepoint
{% highlight shell %}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e936b246222..57337b6286f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -839,28 +839,22 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) thro
// the pending checkpoint must be discarded after the
finalization
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
- // TODO: add savepoints to completed checkpoint store
once FLINK-4815 has been completed
- if (!completedCheckpoint.getProperties().isSavepoint())
{
- try {
-
completedCheckpointStore.addCheckpoint(completedCheckpoint);
- } catch (Exception exception) {
- // we failed to store the completed
checkpoint. Let's clean up
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
completedCheckpoint.discardOnFailedStoring();
- } catch (Throwable t) {
- LOG.warn("Could
not properly discard completed checkpoint {} of job {}.",
completedCheckpoint.getCheckpointID(), job, t);
- }
+ try {
+
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+ } catch (Exception exception) {
+ // we failed to store the completed checkpoint.
Let's clean up
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
completedCheckpoint.discardOnFailedStoring();
+ } catch (Throwable t) {
+ LOG.warn("Could not
properly discard completed checkpoint {}.",
completedCheckpoint.getCheckpointID(), t);
}
- });
-
- throw new CheckpointException("Could
not complete the pending checkpoint " + checkpointId + '.', exception);
- }
+ }
+ });
- // drop those pending checkpoints that are at
prior to the completed one
- dropSubsumedCheckpoints(checkpointId);
+ throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.', exception);
}
} finally {
pendingCheckpoints.remove(checkpointId);
@@ -870,6 +864,9 @@ public void run() {
rememberRecentCheckpointId(checkpointId);
+ // drop those pending checkpoints that are at prior to the
completed one
+ dropSubsumedCheckpoints(checkpointId);
+
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index b113e12ef69..3650f43066d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1494,8 +1494,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws
Exception {
assertTrue(pending.isDiscarded());
assertTrue(savepointFuture.isDone());
- // the now the savepoint should be completed but not added to
the completed checkpoint store
- assertEquals(0,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ // the now we should have a completed checkpoint
+ assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
// validate that the relevant tasks got a confirmation message
@@ -1510,7 +1510,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws
Exception {
verify(subtaskState2,
times(1)).registerSharedStates(any(SharedStateRegistry.class));
}
- CompletedCheckpoint success = savepointFuture.get();
+ CompletedCheckpoint success =
coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, success.getJobId());
assertEquals(timestamp, success.getTimestamp());
assertEquals(pending.getCheckpointId(),
success.getCheckpointID());
@@ -1528,9 +1528,9 @@ public void testTriggerAndConfirmSimpleSavepoint() throws
Exception {
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, checkpointIdNew));
assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(0,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
- CompletedCheckpoint successNew = savepointFuture.get();
+ CompletedCheckpoint successNew =
coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, successNew.getJobId());
assertEquals(timestampNew, successNew.getTimestamp());
assertEquals(checkpointIdNew, successNew.getCheckpointID());
@@ -1557,7 +1557,7 @@ public void testTriggerAndConfirmSimpleSavepoint() throws
Exception {
* Triggers a savepoint and two checkpoints. The second checkpoint
completes
* and subsumes the first checkpoint, but not the first savepoint. Then
we
* trigger another checkpoint and savepoint. The 2nd savepoint
completes and
- * does neither subsume the last checkpoint nor the first savepoint.
+ * subsumes the last checkpoint, but not the first savepoint.
*/
@Test
public void testSavepointsAreNotSubsumed() throws Exception {
@@ -1614,19 +1614,18 @@ public void testSavepointsAreNotSubsumed() throws
Exception {
assertFalse(savepointFuture1.isDone());
assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
- long checkpointId3 = counter.getLast();
assertEquals(2, coord.getNumberOfPendingCheckpoints());
CompletableFuture<CompletedCheckpoint> savepointFuture2 =
coord.triggerSavepoint(timestamp + 4, savepointDir);
long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
- // 2nd savepoint should not subsume the last checkpoint and the
1st savepoint
+ // 2nd savepoint should subsume the last checkpoint, but not
the 1st savepoint
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, savepointId2));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, savepointId2));
- assertEquals(2, coord.getNumberOfPendingCheckpoints());
- assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ assertEquals(2,
coord.getNumberOfRetainedSuccessfulCheckpoints());
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
assertFalse(savepointFuture1.isDone());
@@ -1636,15 +1635,9 @@ public void testSavepointsAreNotSubsumed() throws
Exception {
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, savepointId1));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, savepointId1));
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
- assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
- assertTrue(savepointFuture1.isDone());
-
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID1, checkpointId3));
- coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid,
attemptID2, checkpointId3));
-
assertEquals(0, coord.getNumberOfPendingCheckpoints());
- assertEquals(2,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(3,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertTrue(savepointFuture1.isDone());
}
private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -3467,92 +3460,6 @@ public void testCheckpointStatsTrackerRestoreCallback()
throws Exception {
.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
}
- /**
- * FLINK-6328
- *
- * Tests that savepoints are not added to the {@link
CompletedCheckpointStore} and,
- * thus, are not subject to job recovery. The reason that we don't want
that (until
- * FLINK-4815 has been finished) is that the lifecycle of savepoints is
not controlled
- * by the {@link CheckpointCoordinator}.
- */
- @Test
- public void testSavepointsAreNotAddedToCompletedCheckpointStore()
throws Exception {
- final JobID jobId = new JobID();
- final ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
- final ExecutionVertex vertex1 =
mockExecutionVertex(executionAttemptId);
- final CompletedCheckpointStore completedCheckpointStore = new
StandaloneCompletedCheckpointStore(1);
- final long checkpointTimestamp1 = 1L;
- final long savepointTimestamp = 2L;
- final long checkpointTimestamp2 = 3L;
- final String savepointDir =
tmpFolder.newFolder().getAbsolutePath();
-
- final StandaloneCheckpointIDCounter checkpointIDCounter = new
StandaloneCheckpointIDCounter();
-
- CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinator(
- jobId,
- 600000L,
- 600000L,
- 0L,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- new ExecutionVertex[]{vertex1},
- new ExecutionVertex[]{vertex1},
- new ExecutionVertex[]{vertex1},
- checkpointIDCounter,
- completedCheckpointStore,
- new MemoryStateBackend(),
- Executors.directExecutor(),
- SharedStateRegistry.DEFAULT_FACTORY);
-
- // trigger a first checkpoint
- assertTrue(
- "Triggering of a checkpoint should work.",
-
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp1, false));
-
- assertTrue(0 ==
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
- // complete the 1st checkpoint
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- // check that the checkpoint has been completed
- assertTrue(1 ==
completedCheckpointStore.getNumberOfRetainedCheckpoints());
-
- // trigger a savepoint --> this should not have any effect on
the CompletedCheckpointStore
- CompletableFuture<CompletedCheckpoint> savepointFuture =
checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
-
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- // check that no errors occurred
- final CompletedCheckpoint savepoint = savepointFuture.get();
-
- assertFalse(
- "The savepoint should not have been added to the
completed checkpoint store",
- savepoint.getCheckpointID() ==
completedCheckpointStore.getLatestCheckpoint().getCheckpointID());
-
- assertTrue(
- "Triggering of a checkpoint should work.",
-
checkpointCoordinator.triggerCheckpoint(checkpointTimestamp2, false));
-
- // complete the 2nd checkpoint
- checkpointCoordinator.receiveAcknowledgeMessage(
- new AcknowledgeCheckpoint(
- jobId,
- executionAttemptId,
- checkpointIDCounter.getLast()));
-
- assertTrue(
- "The latest completed (proper) checkpoint should have
been added to the completed checkpoint store.",
-
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() ==
checkpointIDCounter.getLast());
- }
-
@Test
public void testSharedStateRegistrationOnRestore() throws Exception {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Savepoints should be counted as retained checkpoints
> ----------------------------------------------------
>
> Key: FLINK-10354
> URL: https://issues.apache.org/jira/browse/FLINK-10354
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.6.0
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> This task is about reverting [FLINK-6328].
> The problem is that you can get incorrect results with exactly-once sinks if
> there is a failure after taking a savepoint but before taking the next
> checkpoint because the savepoint will also have manifested side effects to
> the sink.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)