pnowojski commented on code in PR #27182:
URL: https://github.com/apache/flink/pull/27182#discussion_r2524099004


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -153,6 +157,20 @@ public void assignStates() {
             }
         }
 
+        // distribute output channel states to downstream tasks if needed
+        // Note: it has to be called after assignAttemptState for all tasks 
since the
+        // redistributing of result subpartition states depend on the 
inputSubtaskMappings
+        // of downstream tasks.
+        if (unalignedAllowOnRecovery) {
+            for (TaskStateAssignment stateAssignment : 
vertexAssignments.values()) {
+                // If unalignedAllowOnRecovery is enabled, all upstream output 
buffers have to be
+                // distributed to downstream since the upstream task side 
doesn’t deserialize
+                // records generally. It is easy to filter records and 
re-upload records if
+                // recovering output buffers on downstream task side directly.
+                stateAssignment.distributeOutputBuffersToDownstream();
+            }
+        }

Review Comment:
   nit: this method is growing a bit too large, being separated via comments 
into distinct parts. Maybe we could refactor it and extract those parts into 
smaller methods? Like `repartitionState(...)`, `assignState(...)`?



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -646,6 +646,15 @@ public class CheckpointingOptions {
                                     + "It can reduce the number of small files 
when enable unaligned checkpoint. "
                                     + "Each subtask will create a new channel 
state file when this is configured to 1.");
 
+    @Experimental
+    public static final ConfigOption<Boolean> UNALIGNED_ALLOW_ON_RECOVERY =
+            
ConfigOptions.key("execution.checkpointing.unaligned.allow-on-recovery")

Review Comment:
   nit: `execution.checkpointing.unaligned.during-recovery.enabled`?
   
   also given you want to provide this feature in two steps:
   - recovery output buffers on the input side
   - support checkpointing during recovery
   
   Would it be useful to have two separate feature flags for that? 🤔 (I'm not 
sure)



##########
flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java:
##########
@@ -95,6 +96,9 @@ public void before() throws Exception {
 
         Configuration configuration = new Configuration();
         configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache 
forever
+        // Disable UNALIGNED_ALLOW_ON_RECOVERY since the output buffer states 
file may be opened
+        // from multiple subtasks
+        configuration.set(UNALIGNED_ALLOW_ON_RECOVERY, false);

Review Comment:
   Can you enable this for randomisation in othe ITCases? 
`org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration`
   
   And make sure that this flag/code path (with non  empty output data) is 
being actually used in our `UnalignedCheckpoint*****ITCase` tests (there are 6 
files testing for various of scenarios)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -99,7 +100,12 @@ public void recover(
         Buffer buffer = bufferWithContext.context;
         try {
             if (buffer.readableBytes() > 0) {
-                for (final RecoveredInputChannel channel : 
getMappedChannels(channelInfo)) {
+                List<RecoveredInputChannel> mappedChannels = 
getMappedChannels(channelInfo);
+                checkState(
+                        mappedChannels.size() == 1,
+                        "One buffer is only distributed to one target 
InputChannel since "
+                                + "one buffer is expected to be processed once 
by the same task.");
+                for (final RecoveredInputChannel channel : mappedChannels) {

Review Comment:
   I'm not following this change? 🤔 Has this invariant/limitation always been 
present in the code? Or is it something new? 
   
   Why do we assert list has exactly single element and than we still have a 
code that loops over all the elements? Shouldn't we use sth like 
`Iterables.getOnlyElement`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java:
##########
@@ -540,6 +580,75 @@ public boolean hasInFlightDataForResultPartition(int 
partitionIndex) {
         return false;
     }
 
+    public int findInputGateIdxForResultPartition(int partitionIndex) {

Review Comment:
   nit: move this method below `distributeOutputBuffersToDownstream` as it is 
used only there. And also it is a bit weird that this method is public, while 
the higher level method `distributeOutputBuffersToDownstream` that is actually 
being called from the outside is package private. Make it private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to