AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519225944



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##########
@@ -88,38 +127,66 @@ private RecoveredInputChannel getChannel(InputChannelInfo 
info) {
        private final ResultPartitionWriter[] writers;
        private final boolean notifyAndBlockOnCompletion;
 
-       ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion) {
+       private final InflightDataRescalingDescriptor channelMapping;
+
+       private final Map<ResultSubpartitionInfo, 
List<CheckpointedResultSubpartition>> rescaledChannels = new HashMap<>();
+
+       ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[] 
writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor 
channelMapping) {
                this.writers = writers;
+               this.channelMapping = channelMapping;
                this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
        }
 
        @Override
        public BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>> 
getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException, 
InterruptedException {
-               BufferBuilder bufferBuilder = 
getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
+               final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
+               BufferBuilder bufferBuilder = 
channels.get(0).requestBufferBuilderBlocking();
                return new BufferWithContext<>(wrap(bufferBuilder), 
Tuple2.of(bufferBuilder, bufferBuilder.createBufferConsumer()));
        }
 
        @Override
-       public void recover(ResultSubpartitionInfo subpartitionInfo, 
Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) throws 
IOException {
+       public void recover(
+                       ResultSubpartitionInfo subpartitionInfo,
+                       int oldSubtaskIndex,
+                       Tuple2<BufferBuilder, BufferConsumer> 
bufferBuilderAndConsumer) throws IOException {
                bufferBuilderAndConsumer.f0.finish();
                if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
-                       boolean added = 
getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1, 
Integer.MIN_VALUE);
-                       if (!added) {
-                               throw new IOException("Buffer consumer couldn't 
be added to ResultSubpartition");
+                       final List<CheckpointedResultSubpartition> channels = 
getMappedChannels(subpartitionInfo);
+                       for (final CheckpointedResultSubpartition channel : 
channels) {
+                               // channel selector is created from the 
downstream's point of view: the subtask of downstream = subpartition index of 
recovered buffer
+                               final VirtualChannelSelector channelSelector = 
new VirtualChannelSelector(subpartitionInfo.getSubPartitionIdx(), 
oldSubtaskIndex);
+                               
channel.add(EventSerializer.toBufferConsumer(channelSelector, false), 
Integer.MIN_VALUE);
+                               boolean added = 
channel.add(bufferBuilderAndConsumer.f1.copy(), Integer.MIN_VALUE);
+                               if (!added) {
+                                       throw new IOException("Buffer consumer 
couldn't be added to ResultSubpartition");
+                               }
                        }
-               } else {
-                       bufferBuilderAndConsumer.f1.close();
                }
+               bufferBuilderAndConsumer.f1.close();
        }
 
-       private CheckpointedResultSubpartition 
getSubpartition(ResultSubpartitionInfo subpartitionInfo) {
-               ResultPartitionWriter writer = 
writers[subpartitionInfo.getPartitionIdx()];
-               if (writer instanceof CheckpointedResultPartition) {
-                       return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx());
-               } else {
-                       throw new IllegalStateException(
-                               "Cannot restore state to a non-checkpointable 
partition type: " + writer);
+       private CheckpointedResultSubpartition getSubpartition(int 
partitionIndex, int subPartitionIdx) {
+               ResultPartitionWriter writer = writers[partitionIndex];
+               if (!(writer instanceof CheckpointedResultPartition)) {
+                       throw new IllegalStateException("Cannot restore state 
to a non-checkpointable partition type: " + writer);
                }
+               return ((CheckpointedResultPartition) 
writer).getCheckpointedSubpartition(subPartitionIdx);

Review comment:
       In the event of downscaling on input side, the state on output side is 
exactly recovered by the upstream subtasks that produced it. 
   
   However, as you pointed out, there might be fewer subpartitions. In this 
case the `SubtaskStateMapper` of the input side will create a mapping that is 
also set to the `InflightDataRescalingDescriptor` of the output side in the 
`rescaledChannelsMappings` part.
   
   So the sequential reader recovers a buffer with old `SubpartitionInfo` and 
the mapping is used to find all new channels to which to send the buffer. At 
this point, the mapping guarantees that all old channels are remapped to new 
channels.
   
   For example, consider the following downscaling of a key range on input side.
   ```
   3 partitions: [0; 43) [43; 87) [87; 128)
   2 partitions: [0; 64) [64; 128)
   mapping: 0->[0; 1]; 1->[1;2] (new to old)
   reverse mapping used in sequential reader: 0->[0]; 1->[0;1]; 2->[1] (old to 
new)
   ```
   When a buffer from subpartition 1 is recovered (from 3 partitions), it needs 
to be send to subpartition 0 and 1. Similarly, if a buffer from subpartition 2 
is recovered, it needs to be send to subpartition 1.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to