[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16978243#comment-16978243
 ] 

Piotr Nowojski edited comment on FLINK-14845 at 11/20/19 9:23 AM:
------------------------------------------------------------------

+1 for the future. Could be useful also for {{PipelinedSubpartition}} in some 
rare scenario of network bound clusters.

Compressing the data in the Task thread with the presence of an 
{{OutputFlusher}} would be very difficult to do, maybe impossible without 
adding extra synchronisation. It's because currently buffer 
({{BufferConsumer}}) can be handed to netty for consumption while more records 
can be appended from the task thread at the same time. To deal with that, we 
would have to compress data only per record, or add extra synchronisation 
between Netty and Task threads.

I think maybe a better idea would be to do the compression on the boundary of 
{{ResultSubpartition}} and netty (expand 
{{CreditBasedSequenceNumberingViewReader}}? create a new alternative 
{{NetworkSequenceViewReader}}?), and perform the compression in the Netty 
thread. I don't think this should be an issue, as this would be non blocking 
operation that's relatively fast (same order of magnitude as the copying the 
memory).

To be the devil advocate here. Wouldn't Blink benefit more, from a more 
specific, columnar compression/decompression compared to a generic 
{{Buffer}}/{{MemorySegment}} based? Compressing each column independently 
should give better compression ratios.

edit:
{quote}
If we don't touch the Pipeline part and only introduce compression to 
BoundedBlockingSubpartition
{quote}
Is there a benefit of doing this just for {{BoundedBlockingSubpartition}}? It 
would make system less complete, with some features working only with 
combination of others etc.


was (Author: pnowojski):
+1 for the future. Could be useful also for {{PipelinedSubpartition}} in some 
rare scenario of network bound clusters.

Compressing the data in the Task thread with the presence of an 
{{OutputFlusher}} would be very difficult to do, maybe impossible without 
adding extra synchronisation. It's because currently buffer 
({{BufferConsumer}}) can be handed to netty for consumption while more records 
can be appended from the task thread at the same time. To deal with that, we 
would have to compress data only per record, or add extra synchronisation 
between Netty and Task threads.

I think maybe a better idea would be to do the compression on the boundary of 
{{ResultSubpartition}} and netty (expand 
{{CreditBasedSequenceNumberingViewReader}}? create a new alternative 
{{NetworkSequenceViewReader}}?), and perform the compression in the Netty 
thread. I don't think this should be an issue, as this would be non blocking 
operation that's relatively fast (same order of magnitude as the copying the 
memory).

To be the devil advocate here. Wouldn't Blink benefit more, from a more 
specific, columnar compression/decompression compared to a generic 
{{Buffer}}/{{MemorySegment}} based? Compressing each column independently 
should give better compression ratios.

> Introduce data compression to blocking shuffle.
> -----------------------------------------------
>
>                 Key: FLINK-14845
>                 URL: https://issues.apache.org/jira/browse/FLINK-14845
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to