This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new efdb3b79a5 [feature] add zstd compression codec (#9747)
efdb3b79a5 is described below
commit efdb3b79a55c6cc27bfc5bb02e5c5e46894abde8
Author: Kang <[email protected]>
AuthorDate: Fri May 27 21:56:18 2022 +0800
[feature] add zstd compression codec (#9747)
ZSTD compression is fast with high compression ratio. It can be used to
archive higher compression ratio
than default Lz4f codec for storing cost sensitive data such as logs.
Compared to Lz4f codec, we see zstd codec get 35% compressed size off, 30%
faster at first time read without OS page
cache, 40% slower at second time read with OS page cache in the following
comparison test.
test data: 25GB text log, 110 million rows
test table: test_table(ts varchar(30), log string)
test SQL: set enable_vectorized_engine=1; select sum(length(log)) from
test_table
be.conf: disable_storage_page_cache = true
set this config to disable doris page cache to avoid all data cached in
memory for test real decompression speed.
test result
master branch with lz4f codec result:
- compressed size 4.3G
- SQL first exec time(read data from disk + decompress + little
computation) : 18.3s
- SQL second exec time(read data from OS pagecache + decompress + little
computation) : 2.4s
this branch with zstd codec (hardcode enable it) result:
- compressed size: 2.8G
- SQL first exec time: 12.8s
- SQL second exec time: 3.4s
---
be/src/util/block_compression.cpp | 149 ++++++++++++++++++++++++++++++++
be/test/util/block_compression_test.cpp | 2 +
2 files changed, 151 insertions(+)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index a1ee74f047..51f485f348 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -22,6 +22,8 @@
#include <snappy/snappy-sinksource.h>
#include <snappy/snappy.h>
#include <zlib.h>
+#include <zstd.h>
+#include <zstd_errors.h>
#include <limits>
@@ -375,6 +377,150 @@ public:
}
};
+// for ZSTD compression and decompression, with BOTH fast and high compression
ratio
+class ZstdBlockCompression : public BlockCompressionCodec {
+public:
+ // reenterable initialization for compress/decompress context
+ inline Status init() override {
+ if (!ctx_c) {
+ ctx_c = ZSTD_createCCtx();
+ if (!ctx_c) {
+ return Status::InvalidArgument("Fail to ZSTD_createCCtx");
+ }
+ }
+
+ if (!ctx_d) {
+ ctx_d = ZSTD_createDCtx();
+ if (!ctx_d) {
+ return Status::InvalidArgument("Fail to ZSTD_createDCtx");
+ }
+ }
+
+ return Status::OK();
+ }
+
+ ~ZstdBlockCompression() override {
+ if (ctx_c) ZSTD_freeCCtx(ctx_c);
+ if (ctx_d) ZSTD_freeDCtx(ctx_d);
+ }
+
+ size_t max_compressed_len(size_t len) const override {
+ if (len > std::numeric_limits<int32_t>::max()) {
+ return 0;
+ }
+ return ZSTD_compressBound(len);
+ }
+
+ Status compress(const Slice& input, Slice* output) const override {
+ std::vector<Slice> inputs {input};
+ return compress(inputs, output);
+ }
+
+ // follow ZSTD official example
+ //
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
+ Status compress(const std::vector<Slice>& inputs, Slice* output) const {
+ if (!ctx_c) return Status::InvalidArgument("compression context NOT
initialized");
+
+ // reset ctx to start new compress session
+ auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(strings::Substitute(
+ "ZSTD_CCtx_reset error: $0",
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+ // set compression level to default 3
+ ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel,
ZSTD_CLEVEL_DEFAULT);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_CCtx_setParameter compression
level error: $0",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+ // set checksum flag to 1
+ ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_CCtx_setParameter checksumFlag
error: $0",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+
+ ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+ for (size_t i = 0; i < inputs.size(); i++) {
+ ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+
+ bool last_input = (i == inputs.size() - 1);
+ auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+
+ bool finished = false;
+ do {
+ // do compress
+ auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf,
mode);
+
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_compressStream2 error:
$0",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+
+ // ret is ZSTD hint for needed output buffer size
+ if (ret > 0 && out_buf.pos == out_buf.size) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_compressStream2 output
buffer full"));
+ }
+
+ finished = last_input ? (ret == 0) : (in_buf.pos ==
inputs[i].size);
+ } while (!finished);
+ }
+
+ // set compressed size for caller
+ output->size = out_buf.pos;
+
+ return Status::OK();
+ }
+
+ // follow ZSTD official example
+ //
https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c
+ Status decompress(const Slice& input, Slice* output) const {
+ if (!ctx_d) return Status::InvalidArgument("decompression context NOT
initialized");
+
+ // reset ctx to start a new decompress session
+ auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(strings::Substitute(
+ "ZSTD_DCtx_reset error: $0",
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+
+ ZSTD_inBuffer in_buf = {input.data, input.size, 0};
+ ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+ while (in_buf.pos < in_buf.size) {
+ // do decompress
+ auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf);
+
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_decompressStream error: $0",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+ }
+
+ // ret is ZSTD hint for needed output buffer size
+ if (ret > 0 && out_buf.pos == out_buf.size) {
+ return Status::InvalidArgument(
+ strings::Substitute("ZSTD_decompressStream output
buffer full"));
+ }
+ }
+
+ // set decompressed size for caller
+ output->size = out_buf.pos;
+
+ return Status::OK();
+ }
+
+private:
+ // will be reused by compress/decompress
+ ZSTD_CCtx* ctx_c = nullptr;
+ ZSTD_DCtx* ctx_d = nullptr;
+};
+
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
std::unique_ptr<BlockCompressionCodec>&
codec) {
BlockCompressionCodec* ptr = nullptr;
@@ -394,6 +540,9 @@ Status
get_block_compression_codec(segment_v2::CompressionTypePB type,
case segment_v2::CompressionTypePB::ZLIB:
ptr = new ZlibBlockCompression();
break;
+ case segment_v2::CompressionTypePB::ZSTD:
+ ptr = new ZstdBlockCompression();
+ break;
default:
return Status::NotFound(strings::Substitute("unknown compression
type($0)", type));
}
diff --git a/be/test/util/block_compression_test.cpp
b/be/test/util/block_compression_test.cpp
index 0bf62e6d9d..b1ca291f78 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -101,6 +101,7 @@ TEST_F(BlockCompressionTest, single) {
test_single_slice(segment_v2::CompressionTypePB::ZLIB);
test_single_slice(segment_v2::CompressionTypePB::LZ4);
test_single_slice(segment_v2::CompressionTypePB::LZ4F);
+ test_single_slice(segment_v2::CompressionTypePB::ZSTD);
}
void test_multi_slices(segment_v2::CompressionTypePB type) {
@@ -156,6 +157,7 @@ TEST_F(BlockCompressionTest, multi) {
test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
test_multi_slices(segment_v2::CompressionTypePB::LZ4);
test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
+ test_multi_slices(segment_v2::CompressionTypePB::ZSTD);
}
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]