BiteTheDDDDt commented on code in PR #12573:
URL: https://github.com/apache/doris/pull/12573#discussion_r970782866


##########
be/src/vec/core/block.cpp:
##########
@@ -689,15 +690,16 @@ Status Block::filter_block(Block* block, int 
filter_column_id, int column_to_kee
     return Status::OK();
 }
 
-Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
+/*Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,

Review Comment:
   Maybe we can delete those code directly.



##########
be/src/util/block_compression.cpp:
##########
@@ -78,122 +123,260 @@ class Lz4BlockCompression : public BlockCompressionCodec 
{
         return Status::OK();
     }
 
-    size_t max_compressed_len(size_t len) const override {
-        if (len > std::numeric_limits<int32_t>::max()) {
-            return 0;
+    size_t max_compressed_len(size_t len) override { return 
LZ4_compressBound(len); }
+
+private:
+    // reuse LZ4 compress stream
+    Status _acquire_compression_ctx(Context** out) {
+        std::lock_guard<std::mutex> l(_ctx_mutex);
+        if (_ctx_pool.empty()) {
+            Context* context = new (std::nothrow) Context();
+            if (context == nullptr) {
+                return Status::InvalidArgument("new LZ4 context error");
+            }
+            context->ctx = LZ4_createStream();
+            if (context->ctx == nullptr) {
+                delete context;
+                return Status::InvalidArgument("LZ4_createStream error");
+            }
+            *out = context;
+            return Status::OK();
         }
-        return LZ4_compressBound(len);
+        *out = _ctx_pool.back();

Review Comment:
   Does we can move lock to this line?



##########
be/src/vec/core/block.cpp:
##########
@@ -711,10 +713,10 @@ Status Block::serialize(PBlock* pblock, std::string* 
compressed_buffer, size_t*
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real 
size.
-    std::string* column_values = nullptr;
+    std::string column_values;
     try {
-        column_values = pblock->mutable_column_values();
-        column_values->resize(content_uncompressed_size);
+        //column_values = pblock->mutable_column_values();

Review Comment:
   ditto



##########
be/src/util/block_compression.cpp:
##########
@@ -28,47 +28,92 @@
 #include <limits>
 
 #include "gutil/strings/substitute.h"
+#include "util/defer_op.h"
 #include "util/faststring.h"
 
 namespace doris {
 
 using strings::Substitute;
 
-Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, 
Slice* output) const {
+Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, 
size_t uncompressed_size,
+                                       faststring* output) {
     faststring buf;
     // we compute total size to avoid more memory copy
-    size_t total_size = Slice::compute_total_size(inputs);
-    buf.reserve(total_size);
+    buf.reserve(uncompressed_size);
     for (auto& input : inputs) {
         buf.append(input.data, input.size);
     }
     return compress(buf, output);
 }
 
+bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
+    if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
+        return true;
+    }
+    return false;
+}
+
 class Lz4BlockCompression : public BlockCompressionCodec {
+private:
+    struct Context {
+        Context() : ctx(nullptr) {}
+        LZ4_stream_t* ctx;
+        faststring buffer;
+    };
+
 public:
-    static const Lz4BlockCompression* instance() {
+    static Lz4BlockCompression* instance() {
         static Lz4BlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4BlockCompression() override {}
+    ~Lz4BlockCompression() {
+        for (auto ctx : _ctx_pool) {
+            _delete_compression_ctx(ctx);
+        }
+    }
 
-    Status compress(const Slice& input, Slice* output) const override {
-        if (input.size > std::numeric_limits<int32_t>::max() ||
-            output->size > std::numeric_limits<int32_t>::max()) {
-            return Status::InvalidArgument("LZ4 cannot handle data large than 
2G");
+    Status compress(const Slice& input, faststring* output) override {
+        Context* context;
+        RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+        bool compress_failed = false;
+        Defer defer {[&] {
+            if (compress_failed) {
+                _delete_compression_ctx(context);
+            } else {
+                _release_compression_ctx(context);
+            }
+        }};
+        Slice compressed_buf;
+        size_t max_len = max_compressed_len(input.size);
+        if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+            // use output directly
+            output->resize(max_len);
+            compressed_buf.data = reinterpret_cast<char*>(output->data());
+            compressed_buf.size = max_len;
+        } else {
+            // reuse context buffer if max_len < 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+            context->buffer.resize(max_len);
+            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
+            compressed_buf.size = max_len;
         }
-        auto compressed_len =
-                LZ4_compress_default(input.data, output->data, input.size, 
output->size);
+
+        int32_t acc = 1;

Review Comment:
   Does we should definal acc as a constexpr static variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to