zhijiangW commented on a change in pull request #11534: [FLINK-16537][network]
Implement ResultPartition state recovery for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11534#discussion_r400619759
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -89,6 +91,25 @@
super(index, parent);
}
+ @Override
+ public void initializeState() throws IOException, InterruptedException {
+ ReadResult readResult = ReadResult.HAS_MORE_DATA;
+ while (readResult == ReadResult.HAS_MORE_DATA) {
+ BufferBuilder bufferBuilder =
parent.getBufferPool().requestBufferBuilderBlocking();
+ BufferConsumer bufferConsumer =
bufferBuilder.createBufferConsumer();
+ readResult =
parent.getChannelStateReader().readOutputData(subpartitionInfo, bufferBuilder);
Review comment:
I think it is not so necessary to decouple `ResultPartition` with
`ResultSubpartition`, because the `ResultSubpartition` is derived from
`ResultPartition` and it can get everything from parent by design. In the
constructor of `ResultSubpartition`, `ResultPartition` is already passed as
argument and defined as protected variable to be referenced directly by any
kind of subpartitions. So we do not destroy the existing rules or add any new
rules. Also for tests, the separate `PipelinedSubpartitionTest` still needs to
construct parent `ResultPartition` in practice.
But your comment makes me think of another possible option, to pass
`ChannelStateReader` into `ResultPartitionWriter#initializeState`, then we do
not need to pass it into constructor to touch many components.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services