wsry opened a new pull request #10375: [Flink-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle. URL: https://github.com/apache/flink/pull/10375 ## What is the purpose of the change This PR introduce data compression to shuffle which can reduce disk IO and network IO. The results of the tpc-ds benchmark show that for a single query, data compression can reduce at most 40% of the running time. ## Brief change log - Copies the block compression utils from flink-table module to flink-runtime module. - Implements a BufferCompressor and a BufferDecompressor which take in Buffer and return Buffer based on the compression utils copied from table module. - Adds a flag to Buffer to identify if it is compressed. - Each ResultPartition contains a BufferCompressor and each SingleInputGate contains a BufferDecompressor. - In BoundedBlockingSubpartition, when writing a buffer out, the buffer will be compressed (call BufferCompressor#compress) if the compression is enabled. - When writing a buffer to disk, the compression flag is also written out and when reading the buffer back from disk, the compression flag will be set accordingly. - In PipelinedSubpartition, when polling buffer, the buffer will be compressed if compression is enabled and there's readable data to compress. - In SingleInputGate, if a received Buffer is compressed (identified by the compression flag), then it will be decompressed (by calling BufferDecompressor#decompress). - Adds a compression flag to BufferResponse to let the buffer receiver know if the received buffer is compressed. - Adds a ```boolean isLocal``` parameter to ```ResultSubpartitionView#getNextBuffer``` to identify if we are polling buffer from a local input channel to avoid compression for local channel. Note: the buffer will remain uncompressed is the size of the data grows after compression. ## Verifying this change Several new test cases are added to verify the change, including ```BufferCompressionTest```, ```CreditBasedPartitionRequestClientHandlerTest#testReceiveCompressedBuffer```, ```PartitionRequestClientHandlerTest#testReceiveCompressedBuffer```, ```NettyMessageSerializationTest#testEncodeDecode``` compressed buffer, read/write compressed buffer in ```BoundedBlockingSubpartitionWriteReadTest```, ```PipelinedSubpartitionWithReadViewTest#testBufferCompression```, ```SingleInputGateTest#testGetCompressedBuffer```, ```ShuffleCompressionItCase```. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
