pnowojski commented on code in PR #22392:
URL: https://github.com/apache/flink/pull/22392#discussion_r1165593019


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                
operatorChain.alignedBarrierTimeout(checkpointId);
+                                
operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Maybe instead of adding a dependency to `CheckpointMetrics` to all of the 
call stack down to the subpartition, can `alignedBarrierTimeout` return `true` 
or `false` depending if the barrier has timed out or not? 🤔 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -381,7 +383,7 @@ private void registerAlignmentTimer(
                 registerTimer.registerTask(
                         () -> {
                             try {
-                                
operatorChain.alignedBarrierTimeout(checkpointId);
+                                
operatorChain.alignedBarrierTimeout(checkpointId, metrics);

Review Comment:
   Can we safely pass the `CheckpointMetricsBuilder` to the timer thread? Maybe 
I'm mis-remembering something, but the ownership and fully responsibility for 
`CheckpointMetricsBuilder` seems to be passed from the 
`SubtaskCheckpointCoordinatorImpl` to the `AsyncCheckpointRunnable`, which uses 
it to build the metrics. `AsyncCheckpointRunnable` and the alignment timer, are 
running in different threads, creating both problems with the actual memory 
visibility AND race conditions?
   
   Shouldn't this be set in the `AsyncCheckpointRunnable` thread via a code 
path similar to 
`org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.SnapshotsFinalizeResult#bytesPersistedDuringAlignment`?
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to