AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519229083



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##########
@@ -88,38 +127,66 @@ private RecoveredInputChannel getChannel(InputChannelInfo 
info) {
        private final ResultPartitionWriter[] writers;
        private final boolean notifyAndBlockOnCompletion;
 
-       ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion) {
+       private final InflightDataRescalingDescriptor channelMapping;
+
+       private final Map<ResultSubpartitionInfo, 
List<CheckpointedResultSubpartition>> rescaledChannels = new HashMap<>();
+
+       ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor 
channelMapping) {
                this.writers = writers;
+               this.channelMapping = channelMapping;
                this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
        }
 
        @Override
        public BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>> 
getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, 
InterruptedException {
-               BufferBuilder bufferBuilder = 
getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
+               final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
+               BufferBuilder bufferBuilder = 
channels.get(0).requestBufferBuilderBlocking();
                return new BufferWithContext<>(wrap(bufferBuilder), 
Tuple2.of(bufferBuilder, bufferBuilder.createBufferConsumer()));
        }
 
        @Override
-       public void recover(ResultSubpartitionInfo subpartitionInfo, 
Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) throws 
IOException {
+       public void recover(
+                       ResultSubpartitionInfo subpartitionInfo,
+                       int oldSubtaskIndex,
+                       Tuple2<BufferBuilder, BufferConsumer> 
bufferBuilderAndConsumer) throws IOException {
                bufferBuilderAndConsumer.f0.finish();
                if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
-                       boolean added = 
getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1, 
Integer.MIN_VALUE);
-                       if (!added) {
-                               throw new IOException("Buffer consumer couldn't 
be added to ResultSubpartition");
+                       final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
+                       for (final CheckpointedResultSubpartition channel : 
channels) {
+                               // channel selector is created from the 
downstream's point of view: the subtask of downstream = subpartition index of 
recovered buffer
+                               final VirtualChannelSelector channelSelector = 
new VirtualChannelSelector(subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+                               
channel.add(EventSerializer.toBufferConsumer(channelSelector, false), 
Integer.MIN_VALUE);
+                               boolean added = 
channel.add(bufferBuilderAndConsumer.f1.copy(), Integer.MIN_VALUE);
+                               if (!added) {
+                                       throw new IOException("Buffer consumer 
couldn't be added to ResultSubpartition");
+                               }
                        }
-               } else {
-                       bufferBuilderAndConsumer.f1.close();
                }
+               bufferBuilderAndConsumer.f1.close();
        }
 
-       private CheckpointedResultSubpartition 
getSubpartition(ResultSubpartitionInfo subpartitionInfo) {
-               ResultPartitionWriter writer = 
writers[subpartitionInfo.getPartitionIdx()];
-               if (writer instanceof CheckpointedResultPartition) {
-                       return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx());
-               } else {
-                       throw new IllegalStateException(
-                               "Cannot restore state to a non-checkpointable 
partition type: " + writer);
+       private CheckpointedResultSubpartition getSubpartition(int 
partitionIndex, int subPartitionIdx) {
+               ResultPartitionWriter writer = writers[partitionIndex];
+               if (!(writer instanceof CheckpointedResultPartition)) {
+                       throw new IllegalStateException("Cannot restore state 
to a non-checkpointable partition type: " + writer);
                }
+               return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subPartitionIdx);
+       }
+
+       private List<CheckpointedResultSubpartition> 
getMappedChannels(ResultSubpartitionInfo subpartitionInfo) {
+               return rescaledChannels.computeIfAbsent(subpartitionInfo, 
this::calculateMapping);
+       }
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ResultSubpartitionRecoveredStateHandler.class);

Review comment:
       I'll remove the whole commit; it's just for debugging tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to