zhijiangW commented on a change in pull request #10375: [FLINK-14845][runtime]
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r353537910
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -524,6 +533,15 @@ private BufferOrEvent transformToBufferOrEvent(
boolean moreAvailable,
InputChannel currentChannel) throws IOException,
InterruptedException {
if (buffer.isBuffer()) {
+ if (buffer.isCompressed()) {
Review comment:
As I previously concerned, I tried to make the compression property as final
in the constructor of `Buffer` instance. It makes sense to do that on producer
side, but seems a bit trouble for consumer side because some original requested
buffers are from `LocalBufferPool`.
So I have another option to avoid injecting the compression property in
`BufferResponse` and `NetworkBuffer` on consumer side, as long as we judge this
condition via `bufferDecompressor != null` instead here. I guess it makes sense
to do that because whether to generate `bufferDecompressor` in
`SingleInputGateFactory` also from the global configuration for enabling
compression or not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services