rkhachatryan commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r583491820
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##########
@@ -94,17 +145,25 @@ private RecoveredInputChannel getChannel(InputChannelInfo
info) {
private final ResultPartitionWriter[] writers;
private final boolean notifyAndBlockOnCompletion;
+ private final InflightDataRescalingDescriptor channelMapping;
+
+ private final Map<ResultSubpartitionInfo,
List<CheckpointedResultSubpartition>>
+ rescaledChannels = new HashMap<>();
+
ResultSubpartitionRecoveredStateHandler(
- ResultPartitionWriter[] writers, boolean
notifyAndBlockOnCompletion) {
+ ResultPartitionWriter[] writers,
+ boolean notifyAndBlockOnCompletion,
+ InflightDataRescalingDescriptor channelMapping) {
this.writers = writers;
+ this.channelMapping = channelMapping;
this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
}
@Override
public BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>> getBuffer(
ResultSubpartitionInfo subpartitionInfo) throws IOException,
InterruptedException {
- BufferBuilder bufferBuilder =
-
getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
+ final List<CheckpointedResultSubpartition> channels =
getMappedChannels(subpartitionInfo);
+ BufferBuilder bufferBuilder =
channels.get(0).requestBufferBuilderBlocking();
Review comment:
I guess `get(0)` is used here because the actual subpartition that we
use to request a buffer from doesn't matter.
If so, could you please add a comment in the code?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]