AHeise commented on a change in pull request #13845:
URL: https://github.com/apache/flink/pull/13845#discussion_r519225944
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##########
@@ -88,38 +127,66 @@ private RecoveredInputChannel getChannel(InputChannelInfo
info) {
private final ResultPartitionWriter[] writers;
private final boolean notifyAndBlockOnCompletion;
- ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[]
writers, boolean notifyAndBlockOnCompletion) {
+ private final InflightDataRescalingDescriptor channelMapping;
+
+ private final Map<ResultSubpartitionInfo,
List<CheckpointedResultSubpartition>> rescaledChannels = new HashMap<>();
+
+ ResultSubpartitionRecoveredStateHandler(ResultPartitionWriter[]
writers, boolean notifyAndBlockOnCompletion, InflightDataRescalingDescriptor
channelMapping) {
this.writers = writers;
+ this.channelMapping = channelMapping;
this.notifyAndBlockOnCompletion = notifyAndBlockOnCompletion;
}
@Override
public BufferWithContext<Tuple2<BufferBuilder, BufferConsumer>>
getBuffer(ResultSubpartitionInfo subpartitionInfo) throws IOException,
InterruptedException {
- BufferBuilder bufferBuilder =
getSubpartition(subpartitionInfo).requestBufferBuilderBlocking();
+ final List<CheckpointedResultSubpartition> channels =
getMappedChannels(subpartitionInfo);
+ BufferBuilder bufferBuilder =
channels.get(0).requestBufferBuilderBlocking();
return new BufferWithContext<>(wrap(bufferBuilder),
Tuple2.of(bufferBuilder, bufferBuilder.createBufferConsumer()));
}
@Override
- public void recover(ResultSubpartitionInfo subpartitionInfo,
Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) throws
IOException {
+ public void recover(
+ ResultSubpartitionInfo subpartitionInfo,
+ int oldSubtaskIndex,
+ Tuple2<BufferBuilder, BufferConsumer>
bufferBuilderAndConsumer) throws IOException {
bufferBuilderAndConsumer.f0.finish();
if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
- boolean added =
getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1,
Integer.MIN_VALUE);
- if (!added) {
- throw new IOException("Buffer consumer couldn't
be added to ResultSubpartition");
+ final List<CheckpointedResultSubpartition> channels =
getMappedChannels(subpartitionInfo);
+ for (final CheckpointedResultSubpartition channel :
channels) {
+ // channel selector is created from the
downstream's point of view: the subtask of downstream = subpartition index of
recovered buffer
+ final VirtualChannelSelector channelSelector =
new VirtualChannelSelector(subpartitionInfo.getSubPartitionIdx(),
oldSubtaskIndex);
+
channel.add(EventSerializer.toBufferConsumer(channelSelector, false),
Integer.MIN_VALUE);
+ boolean added =
channel.add(bufferBuilderAndConsumer.f1.copy(), Integer.MIN_VALUE);
+ if (!added) {
+ throw new IOException("Buffer consumer
couldn't be added to ResultSubpartition");
+ }
}
- } else {
- bufferBuilderAndConsumer.f1.close();
}
+ bufferBuilderAndConsumer.f1.close();
}
- private CheckpointedResultSubpartition
getSubpartition(ResultSubpartitionInfo subpartitionInfo) {
- ResultPartitionWriter writer =
writers[subpartitionInfo.getPartitionIdx()];
- if (writer instanceof CheckpointedResultPartition) {
- return ((CheckpointedResultPartition)
writer).getCheckpointedSubpartition(subpartitionInfo.getSubPartitionIdx());
- } else {
- throw new IllegalStateException(
- "Cannot restore state to a non-checkpointable
partition type: " + writer);
+ private CheckpointedResultSubpartition getSubpartition(int
partitionIndex, int subPartitionIdx) {
+ ResultPartitionWriter writer = writers[partitionIndex];
+ if (!(writer instanceof CheckpointedResultPartition)) {
+ throw new IllegalStateException("Cannot restore state
to a non-checkpointable partition type: " + writer);
}
+ return ((CheckpointedResultPartition)
writer).getCheckpointedSubpartition(subPartitionIdx);
Review comment:
In the event of downscaling on input side, the state on output side is
exactly recovered by the upstream subtasks that produced it.
However, as you pointed out, there might be fewer subpartitions. In this
case the `SubtaskStateMapper` of the input side will create a mapping that is
also set to the `InflightDataRescalingDescriptor` of the output side in the
`rescaledChannelsMappings` part.
So the sequential reader recovers a buffer with old `SubpartitionInfo` and
the mapping is used to find all new channels to which to send the buffer. At
this point, the mapping guarantees that all old channels are remapped to new
channels.
For example, consider the following downscaling of a key range on input side.
```
3 partitions: [0; 43) [43; 87) [87; 128)
2 partitions: [0; 64) [64; 128)
mapping: 0->[0; 1]; 1->[1;2] (new to old)
reverse mapping used in sequential reader: 0->[0]; 1->[0;1]; 2->[1] (old to
new)
```
When a buffer from subpartition 1 is recovered (from 3 partitions), it needs
to be send to subpartition 0 and 1. Similarly, if a buffer from subpartition 2
is recovered, it needs to be send to subpartition 1.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]