This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cb5b955212b4eb5ec6d3fe1009746baf0ad8a5b5
Author: Rui Fan <[email protected]>
AuthorDate: Wed Nov 12 17:55:29 2025 +0100

    [hotfix][checkpoint] Limit that the one buffer is only distributed to one 
target InputChannel
---
 .../channel/RecoveredChannelStateHandler.java      | 48 ++++++++++------------
 .../InputChannelRecoveredStateHandlerTest.java     | 41 ++++++++++++++++++
 2 files changed, 62 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
index 7a06f52b145..85fc31db4bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
@@ -30,14 +30,15 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer.wrap;
+import static org.apache.flink.util.Preconditions.checkState;
 
 interface RecoveredChannelStateHandler<Info, Context> extends AutoCloseable {
     class BufferWithContext<Context> {
@@ -71,8 +72,7 @@ class InputChannelRecoveredStateHandler
 
     private final InflightDataRescalingDescriptor channelMapping;
 
-    private final Map<InputChannelInfo, List<RecoveredInputChannel>> 
rescaledChannels =
-            new HashMap<>();
+    private final Map<InputChannelInfo, RecoveredInputChannel> 
rescaledChannels = new HashMap<>();
     private final Map<Integer, RescaleMappings> oldToNewMappings = new 
HashMap<>();
 
     InputChannelRecoveredStateHandler(
@@ -85,7 +85,7 @@ class InputChannelRecoveredStateHandler
     public BufferWithContext<Buffer> getBuffer(InputChannelInfo channelInfo)
             throws IOException, InterruptedException {
         // request the buffer from any mapped channel as they all will receive 
the same buffer
-        RecoveredInputChannel channel = getMappedChannels(channelInfo).get(0);
+        RecoveredInputChannel channel = getMappedChannels(channelInfo);
         Buffer buffer = channel.requestBufferBlocking();
         return new BufferWithContext<>(wrap(buffer), buffer);
     }
@@ -99,14 +99,13 @@ class InputChannelRecoveredStateHandler
         Buffer buffer = bufferWithContext.context;
         try {
             if (buffer.readableBytes() > 0) {
-                for (final RecoveredInputChannel channel : 
getMappedChannels(channelInfo)) {
-                    channel.onRecoveredStateBuffer(
-                            EventSerializer.toBuffer(
-                                    new SubtaskConnectionDescriptor(
-                                            oldSubtaskIndex, 
channelInfo.getInputChannelIdx()),
-                                    false));
-                    channel.onRecoveredStateBuffer(buffer.retainBuffer());
-                }
+                RecoveredInputChannel channel = getMappedChannels(channelInfo);
+                channel.onRecoveredStateBuffer(
+                        EventSerializer.toBuffer(
+                                new SubtaskConnectionDescriptor(
+                                        oldSubtaskIndex, 
channelInfo.getInputChannelIdx()),
+                                false));
+                channel.onRecoveredStateBuffer(buffer.retainBuffer());
             }
         } finally {
             buffer.recycleBuffer();
@@ -130,26 +129,21 @@ class InputChannelRecoveredStateHandler
         return (RecoveredInputChannel) inputChannel;
     }
 
-    private List<RecoveredInputChannel> getMappedChannels(InputChannelInfo 
channelInfo) {
+    private RecoveredInputChannel getMappedChannels(InputChannelInfo 
channelInfo) {
         return rescaledChannels.computeIfAbsent(channelInfo, 
this::calculateMapping);
     }
 
-    private List<RecoveredInputChannel> calculateMapping(InputChannelInfo 
info) {
+    @Nonnull
+    private RecoveredInputChannel calculateMapping(InputChannelInfo info) {
         final RescaleMappings oldToNewMapping =
                 oldToNewMappings.computeIfAbsent(
                         info.getGateIdx(), idx -> 
channelMapping.getChannelMapping(idx).invert());
-        final List<RecoveredInputChannel> channels =
-                
Arrays.stream(oldToNewMapping.getMappedIndexes(info.getInputChannelIdx()))
-                        .mapToObj(newChannelIndex -> 
getChannel(info.getGateIdx(), newChannelIndex))
-                        .collect(Collectors.toList());
-        if (channels.isEmpty()) {
-            throw new IllegalStateException(
-                    "Recovered a buffer from old "
-                            + info
-                            + " that has no mapping in "
-                            + 
channelMapping.getChannelMapping(info.getGateIdx()));
-        }
-        return channels;
+        int[] mappedIndexes = 
oldToNewMapping.getMappedIndexes(info.getInputChannelIdx());
+        checkState(
+                mappedIndexes.length == 1,
+                "One buffer is only distributed to one target InputChannel 
since "
+                        + "one buffer is expected to be processed once by the 
same task.");
+        return getChannel(info.getGateIdx(), mappedIndexes[0]);
     }
 }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java
index 823183be99e..e2b1d69c56a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java
@@ -32,7 +32,10 @@ import org.junit.jupiter.api.Test;
 
 import java.util.HashSet;
 
+import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings;
+import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test of different implementation of {@link 
InputChannelRecoveredStateHandler}. */
 class InputChannelRecoveredStateHandlerTest extends 
RecoveredChannelStateHandlerTest {
@@ -77,6 +80,44 @@ class InputChannelRecoveredStateHandlerTest extends 
RecoveredChannelStateHandler
                         }));
     }
 
+    private InputChannelRecoveredStateHandler buildMultiChannelHandler() {
+        // Setup multi-channel scenario to trigger distribution constraint 
validation
+        SingleInputGate multiChannelGate =
+                new SingleInputGateBuilder()
+                        .setNumberOfChannels(2)
+                        
.setChannelFactory(InputChannelBuilder::buildLocalRecoveredChannel)
+                        .setSegmentProvider(networkBufferPool)
+                        .build();
+
+        return new InputChannelRecoveredStateHandler(
+                new InputGate[] {multiChannelGate},
+                new InflightDataRescalingDescriptor(
+                        new InflightDataRescalingDescriptor
+                                        
.InflightDataGateOrPartitionRescalingDescriptor[] {
+                            new InflightDataRescalingDescriptor
+                                    
.InflightDataGateOrPartitionRescalingDescriptor(
+                                    new int[] {2},
+                                    // Force 1:many mapping after inversion
+                                    mappings(to(0), to(0)),
+                                    new HashSet<>(),
+                                    InflightDataRescalingDescriptor
+                                            
.InflightDataGateOrPartitionRescalingDescriptor
+                                            .MappingType.RESCALING)
+                        }));
+    }
+
+    @Test
+    void testBufferDistributedToMultipleInputChannelsThrowsException() throws 
Exception {
+        // Test constraint that prevents buffer distribution to multiple 
channels
+        try (InputChannelRecoveredStateHandler handler = 
buildMultiChannelHandler()) {
+            assertThatThrownBy(() -> handler.getBuffer(channelInfo))
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining(
+                            "One buffer is only distributed to one target 
InputChannel since "
+                                    + "one buffer is expected to be processed 
once by the same task.");
+        }
+    }
+
     @Test
     void testRecycleBufferBeforeRecoverWasCalled() throws Exception {
         // when: Request the buffer.

Reply via email to