akalash commented on code in PR #22761:
URL: https://github.com/apache/flink/pull/22761#discussion_r1262673283
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java:
##########
@@ -65,7 +65,7 @@ public abstract class BufferWritingResultPartition extends
ResultPartition {
private TimerGauge hardBackPressuredTimeMsPerSecond = new TimerGauge();
- private long totalWrittenBytes;
+ protected long totalWrittenBytes;
Review Comment:
Ideally, `totalWrittenBytes` should be changed only in one class
`BufferWritingResultPartition`. So maybe we should forbid `inherited classes`
to write to `subpartitions` directly. But honestly, I don't know how to do it
easily since the current implementation of `inherited classes` use
`subpartitions` a lot and we can not change `subpartitions` visibility from
`protected` to `private` . I mean we can create `addToSubpartition` method in
`BufferWritingResultPartition` and use it everywhere(and maybe it will be
better than now) but it still doesn't protect us from possible future bugs if
somebody decides to use `subpartitions#add` directly in `inherited classes`.
Anyway, if you don't think that it will make things worse, maybe we indeed
for now create the `add`(or `addToSubpartition`) method in
`BufferWritingResultPartition` and will use it everywhere. At least it will
reduce the number of places where we update `totalWrittenBytes`.
Unfortunatelly, I didn't come up with any other solution until now.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java:
##########
@@ -190,51 +195,60 @@ public void recover(
int oldSubtaskIndex,
BufferWithContext<BufferBuilder> bufferWithContext)
throws IOException {
- try (BufferBuilder bufferBuilder = bufferWithContext.context) {
- try (BufferConsumer bufferConsumer =
- bufferBuilder.createBufferConsumerFromBeginning()) {
- bufferBuilder.finish();
- if (bufferConsumer.isDataAvailable()) {
- final List<CheckpointedResultSubpartition> channels =
- getMappedChannels(subpartitionInfo);
- for (final CheckpointedResultSubpartition channel :
channels) {
- // channel selector is created from the downstream's
point of view: the
- // subtask of downstream = subpartition index of
recovered buffer
- final SubtaskConnectionDescriptor channelSelector =
- new SubtaskConnectionDescriptor(
- subpartitionInfo.getSubPartitionIdx(),
oldSubtaskIndex);
- channel.addRecovered(
-
EventSerializer.toBufferConsumer(channelSelector, false));
- channel.addRecovered(bufferConsumer.copy());
- }
- }
+ try (BufferBuilder bufferBuilder = bufferWithContext.context;
+ BufferConsumer bufferConsumer =
bufferBuilder.createBufferConsumerFromBeginning()) {
+ bufferBuilder.finish();
+ if (!bufferConsumer.isDataAvailable()) {
+ return;
+ }
+ final List<ResultSubpartitionInfo> mappedSubpartitions =
+ getMappedSubpartitions(subpartitionInfo);
+ for (final ResultSubpartitionInfo mappedSubpartition :
mappedSubpartitions) {
+ // channel selector is created from the downstream's point of
view: the
+ // subtask of downstream = subpartition index of recovered
buffer
+ final SubtaskConnectionDescriptor channelSelector =
+ new SubtaskConnectionDescriptor(
+ subpartitionInfo.getSubPartitionIdx(),
oldSubtaskIndex);
+ CheckpointedResultPartition checkpointedResultPartition =
+
getCheckpointedResultPartition(mappedSubpartition.getPartitionIdx());
+ checkpointedResultPartition.addRecovered(
Review Comment:
good. It's better now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]