dawidwys commented on a change in pull request #14857:
URL: https://github.com/apache/flink/pull/14857#discussion_r665235501
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -406,6 +406,31 @@ public void initInputsCheckpoint(long id,
CheckpointOptions checkpointOptions)
}
}
+ public void waitForPendingCheckpoints() throws Exception {
+ if (!enableCheckpointAfterTasksFinished) {
+ return;
+ }
+
+ List<AsyncCheckpointRunnable> asyncCheckpointRunnables;
+ synchronized (lock) {
+ asyncCheckpointRunnables = new ArrayList<>(checkpoints.values());
+ }
+
+ // Waits for each checkpoint independently.
+ asyncCheckpointRunnables.forEach(
+ ar -> {
+ try {
+ ar.getFinishedFuture().get();
+ } catch (Exception e) {
+ LOG.error(
Review comment:
Does it make sense to log this exception? As far as I can tell we can
end up here only with an `InterrputedException` on the `get`. Exceptions from
the runnable are not forwarded to the `Future`, are they? We only ever complete
the `Future` with `complete(null);`
I'd suggest to do:
```
final CompletableFuture<Void> combinedFuture;
synchronized (lock) {
combinedFuture =
FutureUtils.waitForAll(
checkpoints.values().stream()
.map(AsyncCheckpointRunnable::getFinishedFuture)
.collect(Collectors.toList()));
}
combinedFuture.get();
```
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -110,8 +109,6 @@ public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
- registerConsumer.accept(this);
Review comment:
Makes total sense! I did not spend enough time analyzing it before.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1864,6 +1866,61 @@ static void processMailTillCheckpointSucceeds(
testHarness.getTaskStateManager().getWaitForReportLatch().await();
}
+ @Test
+ public void testWaitingForPendingCheckpointsOnFinished() throws Exception {
Review comment:
Currently, the test passes both with and without waiting for the pending
checkpoints.
Moreover I think there is a lot of timing assumptions that need to be met
for the test to verify the intended behaviour. I could not think of a good way
to fix all of them (would be best to remove the remaining `Thread#sleep()`),
but what do you think about such test:
```
@Test
public void testWaitingForPendingCheckpointsOnFinished() throws
Exception {
OneShotLatch asyncCheckpointExecuted = new OneShotLatch();
OneShotLatch canCheckpointBeAcknowledged = new OneShotLatch();
OneShotLatch invokeCompleted = new OneShotLatch();
TestCheckpointResponder responder =
new TestCheckpointResponder() {
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
try {
asyncCheckpointExecuted.trigger();
canCheckpointBeAcknowledged.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
CompletableFuture<Void> taskClosed =
CompletableFuture.runAsync(
() -> {
try (StreamTaskMailboxTestHarness<String>
harness =
new
StreamTaskMailboxTestHarnessBuilder<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
.modifyStreamConfig(
config ->
config.setCheckpointingEnabled(true))
.setCheckpointResponder(responder)
.setupOperatorChain(new
EmptyOperator())
.finishForSingletonOperatorChain(
StringSerializer.INSTANCE)
.build()) {
harness.streamTask
.getCheckpointCoordinator()
.setEnableCheckpointAfterTasksFinished(true);
harness.streamTask.triggerCheckpointOnBarrier(
new CheckpointMetaData(1, 101),
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetricsBuilder()
.setBytesProcessedDuringAlignment(0L)
.setAlignmentDurationNanos(0L));
harness.waitForTaskCompletion();
invokeCompleted.trigger();
harness.finishProcessing();
} catch (Exception e) {
e.printStackTrace();
}
},
Executors.newSingleThreadExecutor());
asyncCheckpointExecuted.await();
invokeCompleted.await();
Thread.sleep(500); // give some potential time for the task to
finish before the
// checkpoint is acknowledged
assertFalse(taskClosed.isDone());
canCheckpointBeAcknowledged.trigger();
taskClosed.get();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]