pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r464999207
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
timer);
- final CompletableFuture<?> masterStatesComplete =
pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState);
-
final CompletableFuture<?>
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
+ // We have to take the snapshot of the master hooks
after the coordinator checkpoints has completed.
+ // This is to ensure the tasks are checkpointed after
the OperatorCoordinators in case
+ // ExternallyInducedSource is used.
+ final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete
+ .thenComposeAsync(ignored -> {
+ PendingCheckpoint checkpoint =
+
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+ return
snapshotMasterState(checkpoint);
+ }, timer);
+
FutureUtils.assertNoException(
CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
.handleAsync(
Review comment:
Can you extract the content of this `handleAsync(...)` call to a
separate method or methods? (It was already a bit too large and now you are
making it even bigger)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -576,7 +583,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.advanceToEndOfTime);
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
+ // It
is possible that the tasks has finished checkpointing at this point.
+ // So
we need to complete this pending checkpoint.
Review comment:
In an essence, this is a race condition between sending out snapshot
state calls, and receiving ack's. And the race condition is caused by the fact,
that `CheckpointCoordinator#receiveAcknowledgeMessage` is executed in the
`ioExecutor`, while this logic here is happening on the timer thread?
If so, can we write this as a comment here?
Secondly, @ifndef-SleePy, would this problem go away with the last step of
your refactor? I guess the idea should be, that both of those things would be
executed in job manager's main thread executor, right?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
timer);
- final CompletableFuture<?> masterStatesComplete =
pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState);
-
final CompletableFuture<?>
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
+ // We have to take the snapshot of the master hooks
after the coordinator checkpoints has completed.
+ // This is to ensure the tasks are checkpointed after
the OperatorCoordinators in case
+ // ExternallyInducedSource is used.
+ final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete
+ .thenComposeAsync(ignored -> {
+ PendingCheckpoint checkpoint =
+
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
Review comment:
Why are we ignoring the exception here? Can not it hide bugs?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
Review comment:
Can you copy paste PR description into the commit message? You described
the issue pretty well there, would be nice to keep it in the git history :)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode()
throws Exception {
}
}
+ /**
+ * Test that the checkpoint still behave correctly when the task
checkpoint is triggered by the
+ * master hooks and finished before the master checkpoint.
+ */
+ @Test
+ public void testTaskCheckpointTriggeredByMasterHooks() {
+ try {
+ final JobID jid = new JobID();
+
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new
ExecutionAttemptID();
+ final ExecutionAttemptID attemptID2 = new
ExecutionAttemptID();
+ ExecutionVertex vertex1 =
mockExecutionVertex(attemptID1,
+ (executionAttemptID, jobId, checkpointId,
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+ ExecutionVertex vertex2 =
mockExecutionVertex(attemptID2,
+ (executionAttemptID, jobId, checkpointId,
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord =
getCheckpointCoordinator(jid, vertex1, vertex2);
+ AtomicReference<Long> checkpointIdRef = new
AtomicReference<>();
+
+ OperatorID opID1 =
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+ OperatorID opID2 =
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+ TaskStateSnapshot taskOperatorSubtaskStates1 =
mock(TaskStateSnapshot.class);
+ TaskStateSnapshot taskOperatorSubtaskStates2 =
mock(TaskStateSnapshot.class);
+ OperatorSubtaskState subtaskState1 =
mock(OperatorSubtaskState.class);
+ OperatorSubtaskState subtaskState2 =
mock(OperatorSubtaskState.class);
+
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
+
+ coord.addMasterHook(new
MasterTriggerRestoreHook<Integer>() {
+ @Override
+ public String getIdentifier() {
+ return "anything";
+ }
+
+ @Override
+ public CompletableFuture<Integer>
triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws
Exception {
+ // Acknowledge the checkpoint in the
master hooks so the task snapshots complete before
+ // the master state snapshot completes.
+ checkpointIdRef.set(checkpointId);
+ AcknowledgeCheckpoint
acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+ jid, attemptID1, checkpointId,
new CheckpointMetrics(), taskOperatorSubtaskStates1);
+ AcknowledgeCheckpoint
acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+ jid, attemptID2, checkpointId,
new CheckpointMetrics(), taskOperatorSubtaskStates2);
+
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1,
TASK_MANAGER_LOCATION_INFO);
+
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2,
TASK_MANAGER_LOCATION_INFO);
+ return null;
+ }
+
+ @Override
+ public void restoreCheckpoint(long
checkpointId, Integer checkpointData) throws Exception {
+
+ }
+
+ @Override
+ public SimpleVersionedSerializer<Integer>
createCheckpointDataSerializer() {
+ return new
SimpleVersionedSerializer<Integer>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(Integer
obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public Integer deserialize(int
version, byte[] serialized) throws IOException {
+ return null;
+ }
+ };
+ }
+ });
+
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ assertEquals(0,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0,
manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+ // trigger the first checkpoint. this should succeed
+ final CompletableFuture<CompletedCheckpoint>
checkpointFuture = coord.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
assertFalse(checkpointFuture.isCompletedExceptionally());
+
+ // the now we should have a completed checkpoint
+ assertEquals(1,
coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+ // the canceler should be removed now
+ assertEquals(0,
manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+ // validate that the subtasks states have registered
their shared states.
+ verify(subtaskState1,
times(1)).registerSharedStates(any(SharedStateRegistry.class));
+ verify(subtaskState2,
times(1)).registerSharedStates(any(SharedStateRegistry.class));
+
+ // validate that the relevant tasks got a confirmation
message
+ long checkpointId = checkpointIdRef.get();
+ verify(vertex1.getCurrentExecutionAttempt(),
+ times(1)).triggerCheckpoint(eq(checkpointId),
any(Long.class), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(),
+ times(1)).triggerCheckpoint(eq(checkpointId),
any(Long.class), any(CheckpointOptions.class));
+
+ CompletedCheckpoint success =
coord.getSuccessfulCheckpoints().get(0);
+ assertEquals(jid, success.getJobId());
+ assertEquals(2, success.getOperatorStates().size());
+
+ coord.shutdown(JobStatus.FINISHED);
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
Review comment:
This way of handling and catching of an exception is a left over of some
workaround of an old bug (JUnit? Travis? I have no idea). Nowadays there is no
need for this, so you could remove this try/catch block - it would also improve
error message, as you wouldn't be hiding the original stack trace.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
timer);
- final CompletableFuture<?> masterStatesComplete =
pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState);
-
final CompletableFuture<?>
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
+ // We have to take the snapshot of the master hooks
after the coordinator checkpoints has completed.
+ // This is to ensure the tasks are checkpointed after
the OperatorCoordinators in case
+ // ExternallyInducedSource is used.
+ final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete
+ .thenComposeAsync(ignored -> {
Review comment:
Question: as far as I can tell, the fact that you changed
`thenCompose(...)` to ` thenComposeAsync(..., timer)` shouldn't matter here in
this code and should change anything right? `coordinatorCheckpointsComplete` is
already executed on the `timer` thread after all?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode()
throws Exception {
}
}
+ /**
+ * Test that the checkpoint still behave correctly when the task
checkpoint is triggered by the
+ * master hooks and finished before the master checkpoint.
+ */
Review comment:
So this test is covering for both bug fixes?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]