zhijiangW commented on a change in pull request #10492: [FLINK-15140][runtime]
Fix shuffle data compression doesn't work with BroadcastRecordWriter.
URL: https://github.com/apache/flink/pull/10492#discussion_r355344524
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -152,7 +152,10 @@ private void writeAndCloseBufferConsumer(BufferConsumer
bufferConsumer) throws I
if (canBeCompressed(buffer)) {
final Buffer compressedBuffer =
parent.bufferCompressor.compressToIntermediateBuffer(buffer);
data.writeBuffer(compressedBuffer);
- compressedBuffer.recycleBuffer();
+ // release the intermediate buffer
+ if (compressedBuffer != buffer) {
+
compressedBuffer.recycleBuffer();
Review comment:
It is mainly because before calling the `BufferConsumer#close`, the results
are both correct when `recycleBuffer()` was called once or twice.
- If `recycleBuffer()` was called once, the `close()` would also trigger
recycle buffer internally to make it released.
- If `recycleBuffer()` was called twice, the `close()` would do nothing
because the buffer was already released.
Before changes, the recycle was called only once, and after changes it was
called twice. So the potential inconsistent logics are hidden. And that is my
concern of not very strict semantic of `close` method which would allow the
user making mistakes in the middle of the process.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services