This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 5bfc8e9cb1b4cb98168cd979f07d9c4e1650e5eb Author: Kang <[email protected]> AuthorDate: Fri May 27 21:56:18 2022 +0800 [feature] add zstd compression codec (#9747) ZSTD compression is fast with high compression ratio. It can be used to archive higher compression ratio than default Lz4f codec for storing cost sensitive data such as logs. Compared to Lz4f codec, we see zstd codec get 35% compressed size off, 30% faster at first time read without OS page cache, 40% slower at second time read with OS page cache in the following comparison test. test data: 25GB text log, 110 million rows test table: test_table(ts varchar(30), log string) test SQL: set enable_vectorized_engine=1; select sum(length(log)) from test_table be.conf: disable_storage_page_cache = true set this config to disable doris page cache to avoid all data cached in memory for test real decompression speed. test result master branch with lz4f codec result: - compressed size 4.3G - SQL first exec time(read data from disk + decompress + little computation) : 18.3s - SQL second exec time(read data from OS pagecache + decompress + little computation) : 2.4s this branch with zstd codec (hardcode enable it) result: - compressed size: 2.8G - SQL first exec time: 12.8s - SQL second exec time: 3.4s --- be/src/util/block_compression.cpp | 149 ++++++++++++++++++++++++++++++++ be/test/util/block_compression_test.cpp | 2 + 2 files changed, 151 insertions(+) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index a1ee74f047..51f485f348 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -22,6 +22,8 @@ #include <snappy/snappy-sinksource.h> #include <snappy/snappy.h> #include <zlib.h> +#include <zstd.h> +#include <zstd_errors.h> #include <limits> @@ -375,6 +377,150 @@ public: } }; +// for ZSTD compression and decompression, with BOTH fast and high compression ratio +class ZstdBlockCompression : public BlockCompressionCodec { +public: + // reenterable initialization for compress/decompress context + inline Status init() override { + if (!ctx_c) { + ctx_c = ZSTD_createCCtx(); + if (!ctx_c) { + return Status::InvalidArgument("Fail to ZSTD_createCCtx"); + } + } + + if (!ctx_d) { + ctx_d = ZSTD_createDCtx(); + if (!ctx_d) { + return Status::InvalidArgument("Fail to ZSTD_createDCtx"); + } + } + + return Status::OK(); + } + + ~ZstdBlockCompression() override { + if (ctx_c) ZSTD_freeCCtx(ctx_c); + if (ctx_d) ZSTD_freeDCtx(ctx_d); + } + + size_t max_compressed_len(size_t len) const override { + if (len > std::numeric_limits<int32_t>::max()) { + return 0; + } + return ZSTD_compressBound(len); + } + + Status compress(const Slice& input, Slice* output) const override { + std::vector<Slice> inputs {input}; + return compress(inputs, output); + } + + // follow ZSTD official example + // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c + Status compress(const std::vector<Slice>& inputs, Slice* output) const { + if (!ctx_c) return Status::InvalidArgument("compression context NOT initialized"); + + // reset ctx to start new compress session + auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument(strings::Substitute( + "ZSTD_CCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + // set compression level to default 3 + ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_CCtx_setParameter compression level error: $0", + ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + // set checksum flag to 1 + ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_CCtx_setParameter checksumFlag error: $0", + ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + + ZSTD_outBuffer out_buf = {output->data, output->size, 0}; + + for (size_t i = 0; i < inputs.size(); i++) { + ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; + + bool last_input = (i == inputs.size() - 1); + auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; + + bool finished = false; + do { + // do compress + auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, mode); + + if (ZSTD_isError(ret)) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_compressStream2 error: $0", + ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + + // ret is ZSTD hint for needed output buffer size + if (ret > 0 && out_buf.pos == out_buf.size) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_compressStream2 output buffer full")); + } + + finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); + } while (!finished); + } + + // set compressed size for caller + output->size = out_buf.pos; + + return Status::OK(); + } + + // follow ZSTD official example + // https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c + Status decompress(const Slice& input, Slice* output) const { + if (!ctx_d) return Status::InvalidArgument("decompression context NOT initialized"); + + // reset ctx to start a new decompress session + auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument(strings::Substitute( + "ZSTD_DCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + + ZSTD_inBuffer in_buf = {input.data, input.size, 0}; + ZSTD_outBuffer out_buf = {output->data, output->size, 0}; + + while (in_buf.pos < in_buf.size) { + // do decompress + auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf); + + if (ZSTD_isError(ret)) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_decompressStream error: $0", + ZSTD_getErrorString(ZSTD_getErrorCode(ret)))); + } + + // ret is ZSTD hint for needed output buffer size + if (ret > 0 && out_buf.pos == out_buf.size) { + return Status::InvalidArgument( + strings::Substitute("ZSTD_decompressStream output buffer full")); + } + } + + // set decompressed size for caller + output->size = out_buf.pos; + + return Status::OK(); + } + +private: + // will be reused by compress/decompress + ZSTD_CCtx* ctx_c = nullptr; + ZSTD_DCtx* ctx_d = nullptr; +}; + Status get_block_compression_codec(segment_v2::CompressionTypePB type, std::unique_ptr<BlockCompressionCodec>& codec) { BlockCompressionCodec* ptr = nullptr; @@ -394,6 +540,9 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type, case segment_v2::CompressionTypePB::ZLIB: ptr = new ZlibBlockCompression(); break; + case segment_v2::CompressionTypePB::ZSTD: + ptr = new ZstdBlockCompression(); + break; default: return Status::NotFound(strings::Substitute("unknown compression type($0)", type)); } diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index a339d54409..7b2da38703 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -101,6 +101,7 @@ TEST_F(BlockCompressionTest, single) { test_single_slice(segment_v2::CompressionTypePB::ZLIB); test_single_slice(segment_v2::CompressionTypePB::LZ4); test_single_slice(segment_v2::CompressionTypePB::LZ4F); + test_single_slice(segment_v2::CompressionTypePB::ZSTD); } void test_multi_slices(segment_v2::CompressionTypePB type) { @@ -156,6 +157,7 @@ TEST_F(BlockCompressionTest, multi) { test_multi_slices(segment_v2::CompressionTypePB::ZLIB); test_multi_slices(segment_v2::CompressionTypePB::LZ4); test_multi_slices(segment_v2::CompressionTypePB::LZ4F); + test_multi_slices(segment_v2::CompressionTypePB::ZSTD); } } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
