This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c787fcf3a60a484b159799eb4f326f248f9a6dbd
Author: Rui Fan <[email protected]>
AuthorDate: Mon Nov 3 22:01:10 2025 +0100

    [hotfix][checkpoint] Refactor output buffers distribution logic via 
ResultSubpartitionDistributor
---
 .../channel/RecoveredChannelStateHandler.java      |  55 +++-----
 .../channel/ResultSubpartitionDistributor.java     |  73 +++++++++++
 .../channel/ResultSubpartitionDistributorTest.java | 139 +++++++++++++++++++++
 3 files changed, 228 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
index 3177bd5f9f4..7a06f52b145 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
@@ -158,19 +158,27 @@ class ResultSubpartitionRecoveredStateHandler
 
     private final ResultPartitionWriter[] writers;
     private final boolean notifyAndBlockOnCompletion;
-
-    private final InflightDataRescalingDescriptor channelMapping;
-
-    private final Map<ResultSubpartitionInfo, List<ResultSubpartitionInfo>> 
rescaledChannels =
-            new HashMap<>();
-    private final Map<Integer, RescaleMappings> oldToNewMappings = new 
HashMap<>();
+    private final ResultSubpartitionDistributor resultSubpartitionDistributor;
 
     ResultSubpartitionRecoveredStateHandler(
             ResultPartitionWriter[] writers,
             boolean notifyAndBlockOnCompletion,
             InflightDataRescalingDescriptor channelMapping) {
         this.writers = writers;
-        this.channelMapping = channelMapping;
+        this.resultSubpartitionDistributor =
+                new ResultSubpartitionDistributor(channelMapping) {
+                    /**
+                     * Override the getSubpartitionInfo to perform type 
checking on the
+                     * ResultPartitionWriter.
+                     */
+                    @Override
+                    ResultSubpartitionInfo getSubpartitionInfo(
+                            int partitionIndex, int subPartitionIdx) {
+                        CheckpointedResultPartition writer =
+                                getCheckpointedResultPartition(partitionIndex);
+                        return 
writer.getCheckpointedSubpartitionInfo(subPartitionIdx);
+                    }
+                };
         this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
     }
 
@@ -197,7 +205,7 @@ class ResultSubpartitionRecoveredStateHandler
                 return;
             }
             final List<ResultSubpartitionInfo> mappedSubpartitions =
-                    getMappedSubpartitions(subpartitionInfo);
+                    
resultSubpartitionDistributor.getMappedSubpartitions(subpartitionInfo);
             CheckpointedResultPartition checkpointedResultPartition =
                     
getCheckpointedResultPartition(subpartitionInfo.getPartitionIdx());
             for (final ResultSubpartitionInfo mappedSubpartition : 
mappedSubpartitions) {
@@ -215,11 +223,6 @@ class ResultSubpartitionRecoveredStateHandler
         }
     }
 
-    private ResultSubpartitionInfo getSubpartitionInfo(int partitionIndex, int 
subPartitionIdx) {
-        CheckpointedResultPartition writer = 
getCheckpointedResultPartition(partitionIndex);
-        return writer.getCheckpointedSubpartitionInfo(subPartitionIdx);
-    }
-
     private CheckpointedResultPartition getCheckpointedResultPartition(int 
partitionIndex) {
         ResultPartitionWriter writer = writers[partitionIndex];
         if (!(writer instanceof CheckpointedResultPartition)) {
@@ -229,32 +232,6 @@ class ResultSubpartitionRecoveredStateHandler
         return (CheckpointedResultPartition) writer;
     }
 
-    private List<ResultSubpartitionInfo> getMappedSubpartitions(
-            ResultSubpartitionInfo subpartitionInfo) {
-        return rescaledChannels.computeIfAbsent(subpartitionInfo, 
this::calculateMapping);
-    }
-
-    private List<ResultSubpartitionInfo> 
calculateMapping(ResultSubpartitionInfo info) {
-        final RescaleMappings oldToNewMapping =
-                oldToNewMappings.computeIfAbsent(
-                        info.getPartitionIdx(),
-                        idx -> channelMapping.getChannelMapping(idx).invert());
-        final List<ResultSubpartitionInfo> subpartitions =
-                
Arrays.stream(oldToNewMapping.getMappedIndexes(info.getSubPartitionIdx()))
-                        .mapToObj(
-                                newIndexes ->
-                                        
getSubpartitionInfo(info.getPartitionIdx(), newIndexes))
-                        .collect(Collectors.toList());
-        if (subpartitions.isEmpty()) {
-            throw new IllegalStateException(
-                    "Recovered a buffer from old "
-                            + info
-                            + " that has no mapping in "
-                            + 
channelMapping.getChannelMapping(info.getPartitionIdx()));
-        }
-        return subpartitions;
-    }
-
     @Override
     public void close() throws IOException {
         for (ResultPartitionWriter writer : writers) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java
new file mode 100644
index 00000000000..5c7910d9c50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.channel;
+
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaleMappings;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The distributor for channel state of result subpartition. */
+public class ResultSubpartitionDistributor {
+
+    private final InflightDataRescalingDescriptor channelMapping;
+
+    private final Map<ResultSubpartitionInfo, List<ResultSubpartitionInfo>> 
rescaledChannels =
+            new HashMap<>();
+    private final Map<Integer, RescaleMappings> oldToNewMappings = new 
HashMap<>();
+
+    public ResultSubpartitionDistributor(InflightDataRescalingDescriptor 
channelMapping) {
+        this.channelMapping = channelMapping;
+    }
+
+    public List<ResultSubpartitionInfo> getMappedSubpartitions(
+            ResultSubpartitionInfo subpartitionInfo) {
+        return rescaledChannels.computeIfAbsent(subpartitionInfo, 
this::calculateMapping);
+    }
+
+    private List<ResultSubpartitionInfo> 
calculateMapping(ResultSubpartitionInfo info) {
+        final RescaleMappings oldToNewMapping =
+                oldToNewMappings.computeIfAbsent(
+                        info.getPartitionIdx(),
+                        idx -> channelMapping.getChannelMapping(idx).invert());
+        final List<ResultSubpartitionInfo> subpartitions =
+                
Arrays.stream(oldToNewMapping.getMappedIndexes(info.getSubPartitionIdx()))
+                        .mapToObj(
+                                newIndexes ->
+                                        
getSubpartitionInfo(info.getPartitionIdx(), newIndexes))
+                        .collect(Collectors.toList());
+        if (subpartitions.isEmpty()) {
+            throw new IllegalStateException(
+                    "Recovered a buffer from old "
+                            + info
+                            + " that has no mapping in "
+                            + 
channelMapping.getChannelMapping(info.getPartitionIdx()));
+        }
+        return subpartitions;
+    }
+
+    ResultSubpartitionInfo getSubpartitionInfo(int partitionIndex, int 
subPartitionIdx) {
+        return new ResultSubpartitionInfo(partitionIndex, subPartitionIdx);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java
new file mode 100644
index 00000000000..d58ce647029
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ResultSubpartitionDistributorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.channel;
+
+import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
+import org.apache.flink.runtime.checkpoint.RescaleMappings;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings;
+import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ResultSubpartitionDistributor}. */
+class ResultSubpartitionDistributorTest {
+
+    private static final int DEFAULT_PARTITION_INDEX = 0;
+    private static final int DEFAULT_SUBPARTITION_INDEX = 0;
+
+    private InflightDataRescalingDescriptor createIdentityMapping() {
+        return new InflightDataRescalingDescriptor(
+                new 
InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor
+                        [] {
+                    new InflightDataRescalingDescriptor
+                            .InflightDataGateOrPartitionRescalingDescriptor(
+                            new int[] {1},
+                            RescaleMappings.identity(1, 1),
+                            new HashSet<>(),
+                            InflightDataRescalingDescriptor
+                                    
.InflightDataGateOrPartitionRescalingDescriptor.MappingType
+                                    .IDENTITY)
+                });
+    }
+
+    private InflightDataRescalingDescriptor 
createRescalingMapping(RescaleMappings mappings) {
+        return new InflightDataRescalingDescriptor(
+                new 
InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor
+                        [] {
+                    new InflightDataRescalingDescriptor
+                            .InflightDataGateOrPartitionRescalingDescriptor(
+                            new int[] {2},
+                            mappings,
+                            new HashSet<>(),
+                            InflightDataRescalingDescriptor
+                                    
.InflightDataGateOrPartitionRescalingDescriptor.MappingType
+                                    .RESCALING)
+                });
+    }
+
+    @Test
+    void testGetMappedSubpartitionsIdentityMapping() {
+        InflightDataRescalingDescriptor identityMapping = 
createIdentityMapping();
+        ResultSubpartitionDistributor distributor =
+                new ResultSubpartitionDistributor(identityMapping);
+        ResultSubpartitionInfo inputInfo =
+                new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, 
DEFAULT_SUBPARTITION_INDEX);
+
+        List<ResultSubpartitionInfo> mappedSubpartitions =
+                distributor.getMappedSubpartitions(inputInfo);
+
+        // Identity mapping should preserve original indices
+        assertThat(mappedSubpartitions).hasSize(1);
+        
assertThat(mappedSubpartitions.get(0).getPartitionIdx()).isEqualTo(DEFAULT_PARTITION_INDEX);
+        assertThat(mappedSubpartitions.get(0).getSubPartitionIdx())
+                .isEqualTo(DEFAULT_SUBPARTITION_INDEX);
+    }
+
+    @Test
+    void testGetMappedSubpartitionsRescaling() {
+        // Test rescaling scenario where one input maps to multiple outputs
+        RescaleMappings rescaleMappings = mappings(to(0, 1));
+        InflightDataRescalingDescriptor rescalingMapping = 
createRescalingMapping(rescaleMappings);
+        ResultSubpartitionDistributor distributor =
+                new ResultSubpartitionDistributor(rescalingMapping);
+        ResultSubpartitionInfo inputInfo =
+                new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, 
DEFAULT_SUBPARTITION_INDEX);
+
+        List<ResultSubpartitionInfo> mappedSubpartitions =
+                distributor.getMappedSubpartitions(inputInfo);
+
+        // Rescaling preserves partition index but may change subpartition 
mapping
+        assertThat(mappedSubpartitions).isNotEmpty();
+        assertThat(mappedSubpartitions)
+                .allMatch(info -> info.getPartitionIdx() == 
DEFAULT_PARTITION_INDEX);
+    }
+
+    @Test
+    void testMappingCacheConsistency() {
+        // Verify caching behavior to ensure performance optimization
+        InflightDataRescalingDescriptor identityMapping = 
createIdentityMapping();
+        ResultSubpartitionDistributor distributor =
+                new ResultSubpartitionDistributor(identityMapping);
+        ResultSubpartitionInfo inputInfo =
+                new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, 
DEFAULT_SUBPARTITION_INDEX);
+
+        List<ResultSubpartitionInfo> firstCall = 
distributor.getMappedSubpartitions(inputInfo);
+        List<ResultSubpartitionInfo> secondCall = 
distributor.getMappedSubpartitions(inputInfo);
+
+        // Cache should return same instance
+        assertThat(firstCall).isEqualTo(secondCall).isSameAs(secondCall);
+    }
+
+    @Test
+    void testInvalidMappingThrowsException() {
+        // Test error handling when mapping configuration is inconsistent
+        RescaleMappings mappingsWithNoTarget = mappings();
+        InflightDataRescalingDescriptor invalidMapping =
+                createRescalingMapping(mappingsWithNoTarget);
+        ResultSubpartitionDistributor distributor =
+                new ResultSubpartitionDistributor(invalidMapping);
+        ResultSubpartitionInfo inputInfo =
+                new ResultSubpartitionInfo(DEFAULT_PARTITION_INDEX, 
DEFAULT_SUBPARTITION_INDEX);
+
+        assertThatThrownBy(() -> distributor.getMappedSubpartitions(inputInfo))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Recovered a buffer from old")
+                .hasMessageContaining("that has no mapping in");
+    }
+}

Reply via email to