1996fanrui commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1166686196
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
registerTimer.registerTask(
() -> {
try {
-
operatorChain.alignedBarrierTimeout(checkpointId);
+
operatorChain.alignedBarrierTimeout(checkpointId, metrics);
Review Comment:
> AsyncCheckpointRunnable and the alignment timer, are running in different
threads, creating both problems with the actual memory visibility AND race
conditions?
Before this PR, the `CheckpointMetricsBuilder#setUnalignedCheckpoint` is
only called on Task thread[1]. And IIUC, registerTimer should be executed by
task thread as well.`AsyncCheckpointRunnable` won't call
`CheckpointMetricsBuilder#setUnalignedCheckpoint`, `AsyncCheckpointRunnable`
just uses it to build the metrics. So it cannot be modified concurrently.
After detailed analysis, I guess the first comment[2] should be reverted. It
may lead to wrong unaligned type due to the order of execution, for example:
1. registerTimer thread: aligned barrier timeout to unaligned
2. registerTimer thread: channelStateFuture.complete(inflightBuffers)
3. Channel state writer thread: write these buffers and complete the
`resultSubpartitionStateFuture`
4. AsyncCheckpointRunnable thread: all states are written, and build metrics
5. registerTimer thread: call
`CheckpointMetricsBuilder#setUnalignedCheckpoint(true)`
If the inflightBuffers is empty or very small, the step 3 and step 4 will
faster than step5, and then the unaligned type will be wrong.
Based on this case, i think the solution is :
1. `CheckpointMetricsBuilder#setUnalignedCheckpoint(true)` should be
executed before `channelStateFuture.complete(inflightBuffers)`, that is,
`CheckpointMetricsBuilder` should be passed to
`PipelinedSubpartition#alignedBarrierTimeout`.
2. Add the volatile for `CheckpointMetricsBuilder#unalignedCheckpoint` to
ensure AsyncCheckpointRunnable can read it correctly.
I updated the solution here[3].
> Shouldn't this be set in the AsyncCheckpointRunnable thread via a code
path similar to
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment?
This solution can work. Actually, I have tried to this solution. And I found
this code path is too complex, it includes too many exception cases(in
ChannelStateCheckpointWriter), completedFuture(includes dataFuture and
resultFuture) and complete these futures after merging channel state.
However, I implemented a POC version[4] using this solution. Core process:
- Adding a `CompletableFuture<Boolean> timeoutToUnaligned` inside of
`PipelinedSubpartition`, and complete it when complete channelStateFuture
- `ChannelStateWriteResult`(it's at subtask level) added a
`CompletableFuture<Boolean> resultSubpartitionTimeoutToUnaligned;`, the future
will be completed in the following cases:
- 1. true: Any subpartition be switched from aligned to unaligned
checkpoint.
- 2. false: This result was completed and all subpartitions don't switched
to unaligned checkpoint.
- 3. false: This result fails before any subpartition switched to
unaligned checkpoint.
- `ChannelStateWriteResult` will pass the result to
`OperatorSnapshotFutures`, and then pass it to `AsyncCheckpointRunnable`
Solution2 is more complex than solution1, however, it's more reasonable.
Which one do you prefer?
[1]
https://github.com/apache/flink/blob/a81ffa6d019d9891bd3a54f50fb36ad847721daa/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L738
[2] https://github.com/apache/flink/pull/22392#discussion_r1165593019
[3]
https://github.com/1996fanrui/flink/commit/c23c5dbf08567b84a8258b11539c1d4e06ab2dff
[4]
https://github.com/1996fanrui/flink/commit/d5f2537bb02ea0248b75c9cdc467a60dea541cf3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]