This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5bfc8e9cb1b4cb98168cd979f07d9c4e1650e5eb
Author: Kang <[email protected]>
AuthorDate: Fri May 27 21:56:18 2022 +0800

    [feature] add zstd compression codec (#9747)
    
    ZSTD compression is fast with high compression ratio. It can be used to 
archive higher compression ratio
    than default Lz4f codec for storing cost sensitive data such as logs.
    
    Compared to Lz4f codec, we see zstd codec get 35% compressed size off, 30% 
faster at first time read without OS page
    cache, 40% slower at second time read with OS page cache in the following 
comparison test.
    
    test data: 25GB text log, 110 million rows
    test table: test_table(ts varchar(30), log string)
    test SQL: set enable_vectorized_engine=1; select sum(length(log)) from 
test_table
    be.conf: disable_storage_page_cache = true
    set this config to disable doris page cache to avoid all data cached in 
memory for test real decompression speed.
    test result
    
    master branch with lz4f codec result:
    - compressed size 4.3G
    - SQL first exec time(read data from disk + decompress + little 
computation) : 18.3s
    - SQL second exec time(read data from OS pagecache + decompress + little 
computation) : 2.4s
    
    this branch with zstd codec (hardcode enable it) result:
    - compressed size: 2.8G
    - SQL first exec time: 12.8s
    - SQL second exec time: 3.4s
---
 be/src/util/block_compression.cpp       | 149 ++++++++++++++++++++++++++++++++
 be/test/util/block_compression_test.cpp |   2 +
 2 files changed, 151 insertions(+)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index a1ee74f047..51f485f348 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -22,6 +22,8 @@
 #include <snappy/snappy-sinksource.h>
 #include <snappy/snappy.h>
 #include <zlib.h>
+#include <zstd.h>
+#include <zstd_errors.h>
 
 #include <limits>
 
@@ -375,6 +377,150 @@ public:
     }
 };
 
+// for ZSTD compression and decompression, with BOTH fast and high compression 
ratio
+class ZstdBlockCompression : public BlockCompressionCodec {
+public:
+    // reenterable initialization for compress/decompress context
+    inline Status init() override {
+        if (!ctx_c) {
+            ctx_c = ZSTD_createCCtx();
+            if (!ctx_c) {
+                return Status::InvalidArgument("Fail to ZSTD_createCCtx");
+            }
+        }
+
+        if (!ctx_d) {
+            ctx_d = ZSTD_createDCtx();
+            if (!ctx_d) {
+                return Status::InvalidArgument("Fail to ZSTD_createDCtx");
+            }
+        }
+
+        return Status::OK();
+    }
+
+    ~ZstdBlockCompression() override {
+        if (ctx_c) ZSTD_freeCCtx(ctx_c);
+        if (ctx_d) ZSTD_freeDCtx(ctx_d);
+    }
+
+    size_t max_compressed_len(size_t len) const override {
+        if (len > std::numeric_limits<int32_t>::max()) {
+            return 0;
+        }
+        return ZSTD_compressBound(len);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
+    // follow ZSTD official example
+    //  
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
+    Status compress(const std::vector<Slice>& inputs, Slice* output) const {
+        if (!ctx_c) return Status::InvalidArgument("compression context NOT 
initialized");
+
+        // reset ctx to start new compress session
+        auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_CCtx_reset error: $0", 
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set compression level to default 3
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, 
ZSTD_CLEVEL_DEFAULT);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter compression 
level error: $0",
+                                        
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set checksum flag to 1
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter checksumFlag 
error: $0",
+                                        
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        for (size_t i = 0; i < inputs.size(); i++) {
+            ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+
+            bool last_input = (i == inputs.size() - 1);
+            auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+
+            bool finished = false;
+            do {
+                // do compress
+                auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, 
mode);
+
+                if (ZSTD_isError(ret)) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 error: 
$0",
+                                                
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+                }
+
+                // ret is ZSTD hint for needed output buffer size
+                if (ret > 0 && out_buf.pos == out_buf.size) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 output 
buffer full"));
+                }
+
+                finished = last_input ? (ret == 0) : (in_buf.pos == 
inputs[i].size);
+            } while (!finished);
+        }
+
+        // set compressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+    // follow ZSTD official example
+    //  
https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c
+    Status decompress(const Slice& input, Slice* output) const {
+        if (!ctx_d) return Status::InvalidArgument("decompression context NOT 
initialized");
+
+        // reset ctx to start a new decompress session
+        auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_DCtx_reset error: $0", 
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_inBuffer in_buf = {input.data, input.size, 0};
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        while (in_buf.pos < in_buf.size) {
+            // do decompress
+            auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf);
+
+            if (ZSTD_isError(ret)) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream error: $0",
+                                            
ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+            }
+
+            // ret is ZSTD hint for needed output buffer size
+            if (ret > 0 && out_buf.pos == out_buf.size) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream output 
buffer full"));
+            }
+        }
+
+        // set decompressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+private:
+    // will be reused by compress/decompress
+    ZSTD_CCtx* ctx_c = nullptr;
+    ZSTD_DCtx* ctx_d = nullptr;
+};
+
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
                                    std::unique_ptr<BlockCompressionCodec>& 
codec) {
     BlockCompressionCodec* ptr = nullptr;
@@ -394,6 +540,9 @@ Status 
get_block_compression_codec(segment_v2::CompressionTypePB type,
     case segment_v2::CompressionTypePB::ZLIB:
         ptr = new ZlibBlockCompression();
         break;
+    case segment_v2::CompressionTypePB::ZSTD:
+        ptr = new ZstdBlockCompression();
+        break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression 
type($0)", type));
     }
diff --git a/be/test/util/block_compression_test.cpp 
b/be/test/util/block_compression_test.cpp
index a339d54409..7b2da38703 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -101,6 +101,7 @@ TEST_F(BlockCompressionTest, single) {
     test_single_slice(segment_v2::CompressionTypePB::ZLIB);
     test_single_slice(segment_v2::CompressionTypePB::LZ4);
     test_single_slice(segment_v2::CompressionTypePB::LZ4F);
+    test_single_slice(segment_v2::CompressionTypePB::ZSTD);
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
@@ -156,6 +157,7 @@ TEST_F(BlockCompressionTest, multi) {
     test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
+    test_multi_slices(segment_v2::CompressionTypePB::ZSTD);
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to