This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cb5b955212b4eb5ec6d3fe1009746baf0ad8a5b5 Author: Rui Fan <[email protected]> AuthorDate: Wed Nov 12 17:55:29 2025 +0100 [hotfix][checkpoint] Limit that the one buffer is only distributed to one target InputChannel --- .../channel/RecoveredChannelStateHandler.java | 48 ++++++++++------------ .../InputChannelRecoveredStateHandlerTest.java | 41 ++++++++++++++++++ 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java index 7a06f52b145..85fc31db4bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java @@ -30,14 +30,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel; +import javax.annotation.Nonnull; + import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.flink.runtime.checkpoint.channel.ChannelStateByteBuffer.wrap; +import static org.apache.flink.util.Preconditions.checkState; interface RecoveredChannelStateHandler<Info, Context> extends AutoCloseable { class BufferWithContext<Context> { @@ -71,8 +72,7 @@ class InputChannelRecoveredStateHandler private final InflightDataRescalingDescriptor channelMapping; - private final Map<InputChannelInfo, List<RecoveredInputChannel>> rescaledChannels = - new HashMap<>(); + private final Map<InputChannelInfo, RecoveredInputChannel> rescaledChannels = new HashMap<>(); private final Map<Integer, RescaleMappings> oldToNewMappings = new HashMap<>(); InputChannelRecoveredStateHandler( @@ -85,7 +85,7 @@ class InputChannelRecoveredStateHandler public BufferWithContext<Buffer> getBuffer(InputChannelInfo channelInfo) throws IOException, InterruptedException { // request the buffer from any mapped channel as they all will receive the same buffer - RecoveredInputChannel channel = getMappedChannels(channelInfo).get(0); + RecoveredInputChannel channel = getMappedChannels(channelInfo); Buffer buffer = channel.requestBufferBlocking(); return new BufferWithContext<>(wrap(buffer), buffer); } @@ -99,14 +99,13 @@ class InputChannelRecoveredStateHandler Buffer buffer = bufferWithContext.context; try { if (buffer.readableBytes() > 0) { - for (final RecoveredInputChannel channel : getMappedChannels(channelInfo)) { - channel.onRecoveredStateBuffer( - EventSerializer.toBuffer( - new SubtaskConnectionDescriptor( - oldSubtaskIndex, channelInfo.getInputChannelIdx()), - false)); - channel.onRecoveredStateBuffer(buffer.retainBuffer()); - } + RecoveredInputChannel channel = getMappedChannels(channelInfo); + channel.onRecoveredStateBuffer( + EventSerializer.toBuffer( + new SubtaskConnectionDescriptor( + oldSubtaskIndex, channelInfo.getInputChannelIdx()), + false)); + channel.onRecoveredStateBuffer(buffer.retainBuffer()); } } finally { buffer.recycleBuffer(); @@ -130,26 +129,21 @@ class InputChannelRecoveredStateHandler return (RecoveredInputChannel) inputChannel; } - private List<RecoveredInputChannel> getMappedChannels(InputChannelInfo channelInfo) { + private RecoveredInputChannel getMappedChannels(InputChannelInfo channelInfo) { return rescaledChannels.computeIfAbsent(channelInfo, this::calculateMapping); } - private List<RecoveredInputChannel> calculateMapping(InputChannelInfo info) { + @Nonnull + private RecoveredInputChannel calculateMapping(InputChannelInfo info) { final RescaleMappings oldToNewMapping = oldToNewMappings.computeIfAbsent( info.getGateIdx(), idx -> channelMapping.getChannelMapping(idx).invert()); - final List<RecoveredInputChannel> channels = - Arrays.stream(oldToNewMapping.getMappedIndexes(info.getInputChannelIdx())) - .mapToObj(newChannelIndex -> getChannel(info.getGateIdx(), newChannelIndex)) - .collect(Collectors.toList()); - if (channels.isEmpty()) { - throw new IllegalStateException( - "Recovered a buffer from old " - + info - + " that has no mapping in " - + channelMapping.getChannelMapping(info.getGateIdx())); - } - return channels; + int[] mappedIndexes = oldToNewMapping.getMappedIndexes(info.getInputChannelIdx()); + checkState( + mappedIndexes.length == 1, + "One buffer is only distributed to one target InputChannel since " + + "one buffer is expected to be processed once by the same task."); + return getChannel(info.getGateIdx(), mappedIndexes[0]); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java index 823183be99e..e2b1d69c56a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/InputChannelRecoveredStateHandlerTest.java @@ -32,7 +32,10 @@ import org.junit.jupiter.api.Test; import java.util.HashSet; +import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.mappings; +import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptorUtil.to; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test of different implementation of {@link InputChannelRecoveredStateHandler}. */ class InputChannelRecoveredStateHandlerTest extends RecoveredChannelStateHandlerTest { @@ -77,6 +80,44 @@ class InputChannelRecoveredStateHandlerTest extends RecoveredChannelStateHandler })); } + private InputChannelRecoveredStateHandler buildMultiChannelHandler() { + // Setup multi-channel scenario to trigger distribution constraint validation + SingleInputGate multiChannelGate = + new SingleInputGateBuilder() + .setNumberOfChannels(2) + .setChannelFactory(InputChannelBuilder::buildLocalRecoveredChannel) + .setSegmentProvider(networkBufferPool) + .build(); + + return new InputChannelRecoveredStateHandler( + new InputGate[] {multiChannelGate}, + new InflightDataRescalingDescriptor( + new InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor[] { + new InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor( + new int[] {2}, + // Force 1:many mapping after inversion + mappings(to(0), to(0)), + new HashSet<>(), + InflightDataRescalingDescriptor + .InflightDataGateOrPartitionRescalingDescriptor + .MappingType.RESCALING) + })); + } + + @Test + void testBufferDistributedToMultipleInputChannelsThrowsException() throws Exception { + // Test constraint that prevents buffer distribution to multiple channels + try (InputChannelRecoveredStateHandler handler = buildMultiChannelHandler()) { + assertThatThrownBy(() -> handler.getBuffer(channelInfo)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "One buffer is only distributed to one target InputChannel since " + + "one buffer is expected to be processed once by the same task."); + } + } + @Test void testRecycleBufferBeforeRecoverWasCalled() throws Exception { // when: Request the buffer.
