pnowojski commented on a change in pull request #10492: [FLINK-15140][runtime]
Fix shuffle data compression doesn't work with BroadcastRecordWriter.
URL: https://github.com/apache/flink/pull/10492#discussion_r355332444
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
##########
@@ -44,6 +44,9 @@
private int currentReaderPosition;
+ /** Whether this BufferConsumer is copied from another BufferConsumer
instance. */
+ private final boolean isCopied;
Review comment:
I think I'm not that concerned about the issues that you mentioned. I don't
think this property is tightly coupled with anything else, it just tells
whether this `BufferConsumer` was copied or not. If anything, the issue is that
compression happens "in place" modifing the data, which is the source of the
problem. If we already agreed to such "hack" (for valid reasons), I think it is
ok to for compressor to check whether the `BufferConsumer` was copied or not.
However I have a different concern. If we are copying the `BufferConsumer`,
the first one will not have this flag set. Is this an issue? Currently it might
work, because of the following loop:
```
try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
for (int channel = 0; channel < numberOfChannels; channel++) {
targetPartition.addBufferConsumer(bufferConsumer.copy(),
channel);
}
}
```
but it could be also implemented as follows:
```
for (int channel = 0; channel < numberOfChannels - 1; channel++) {
targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);
}
targetPartition.addBufferConsumer(bufferConsumer, numberOfChannels - 1);
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services