This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0f550aeda7c [fix](compression) handle exception to reuse compression 
context (#35338) (#35380)
0f550aeda7c is described below

commit 0f550aeda7c1798673512e0243dec81a51ffab80
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri May 24 19:56:27 2024 +0800

    [fix](compression) handle exception to reuse compression context (#35338) 
(#35380)
    
    * [fix](compression) handle exception to reuse compression context
    
    Otherwise, there is memleak and new context is allocated, then flush tlb
    consumes a lot sys cpu.
---
 be/src/util/block_compression.cpp | 302 +++++++++++++++++++++-----------------
 1 file changed, 165 insertions(+), 137 deletions(-)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index 332a581f3ec..3dd0d92c944 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -128,31 +128,39 @@ public:
                 _release_compression_ctx(std::move(context));
             }
         }};
-        Slice compressed_buf;
-        size_t max_len = max_compressed_len(input.size);
-        if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            // use output directly
-            output->resize(max_len);
-            compressed_buf.data = reinterpret_cast<char*>(output->data());
-            compressed_buf.size = max_len;
-        } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
-            compressed_buf.size = max_len;
-        }
 
-        size_t compressed_len =
-                LZ4_compress_fast_continue(context->ctx, input.data, 
compressed_buf.data,
-                                           input.size, compressed_buf.size, 
ACCELARATION);
-        if (compressed_len == 0) {
-            compress_failed = true;
-            return Status::InvalidArgument("Output buffer's capacity is not 
enough, size={}",
-                                           compressed_buf.size);
-        }
-        output->resize(compressed_len);
-        if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), 
compressed_len);
+        try {
+            Slice compressed_buf;
+            size_t max_len = max_compressed_len(input.size);
+            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                // use output directly
+                output->resize(max_len);
+                compressed_buf.data = reinterpret_cast<char*>(output->data());
+                compressed_buf.size = max_len;
+            } else {
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+                compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
+                compressed_buf.size = max_len;
+            }
+
+            size_t compressed_len =
+                    LZ4_compress_fast_continue(context->ctx, input.data, 
compressed_buf.data,
+                                               input.size, 
compressed_buf.size, ACCELARATION);
+            if (compressed_len == 0) {
+                compress_failed = true;
+                return Status::InvalidArgument("Output buffer's capacity is 
not enough, size={}",
+                                               compressed_buf.size);
+            }
+            output->resize(compressed_len);
+            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
+                                    compressed_len);
+            }
+        } catch (...) {
+            // Do not set compress_failed to release context
+            DCHECK(!compress_failed);
+            return Status::InternalError("Fail to do LZ4Block compress due to 
exception");
         }
         return Status::OK();
     }
@@ -296,50 +304,57 @@ private:
                 _release_compression_ctx(std::move(context));
             }
         }};
-        Slice compressed_buf;
-        size_t max_len = max_compressed_len(uncompressed_size);
-        if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            // use output directly
-            output->resize(max_len);
-            compressed_buf.data = reinterpret_cast<char*>(output->data());
-            compressed_buf.size = max_len;
-        } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
-            compressed_buf.size = max_len;
-        }
 
-        auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, 
compressed_buf.size,
-                                         &_s_preferences);
-        if (LZ4F_isError(wbytes)) {
-            compress_failed = true;
-            return Status::InvalidArgument("Fail to do LZ4F compress begin, 
res={}",
-                                           LZ4F_getErrorName(wbytes));
-        }
-        size_t offset = wbytes;
-        for (auto input : inputs) {
-            wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + 
offset,
-                                         compressed_buf.size - offset, 
input.data, input.size,
-                                         nullptr);
+        try {
+            Slice compressed_buf;
+            size_t max_len = max_compressed_len(uncompressed_size);
+            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                // use output directly
+                output->resize(max_len);
+                compressed_buf.data = reinterpret_cast<char*>(output->data());
+                compressed_buf.size = max_len;
+            } else {
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+                compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
+                compressed_buf.size = max_len;
+            }
+
+            auto wbytes = LZ4F_compressBegin(context->ctx, 
compressed_buf.data, compressed_buf.size,
+                                             &_s_preferences);
             if (LZ4F_isError(wbytes)) {
                 compress_failed = true;
-                return Status::InvalidArgument("Fail to do LZ4F compress 
update, res={}",
+                return Status::InvalidArgument("Fail to do LZ4F compress 
begin, res={}",
+                                               LZ4F_getErrorName(wbytes));
+            }
+            size_t offset = wbytes;
+            for (auto input : inputs) {
+                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data 
+ offset,
+                                             compressed_buf.size - offset, 
input.data, input.size,
+                                             nullptr);
+                if (LZ4F_isError(wbytes)) {
+                    compress_failed = true;
+                    return Status::InvalidArgument("Fail to do LZ4F compress 
update, res={}",
+                                                   LZ4F_getErrorName(wbytes));
+                }
+                offset += wbytes;
+            }
+            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + 
offset,
+                                      compressed_buf.size - offset, nullptr);
+            if (LZ4F_isError(wbytes)) {
+                compress_failed = true;
+                return Status::InvalidArgument("Fail to do LZ4F compress end, 
res={}",
                                                LZ4F_getErrorName(wbytes));
             }
             offset += wbytes;
-        }
-        wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
-                                  compressed_buf.size - offset, nullptr);
-        if (LZ4F_isError(wbytes)) {
-            compress_failed = true;
-            return Status::InvalidArgument("Fail to do LZ4F compress end, 
res={}",
-                                           LZ4F_getErrorName(wbytes));
-        }
-        offset += wbytes;
-        output->resize(offset);
-        if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
+            output->resize(offset);
+            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
+            }
+        } catch (...) {
+            // Do not set compress_failed to release context
+            DCHECK(!compress_failed);
+            return Status::InternalError("Fail to do LZ4F compress due to 
exception");
         }
 
         return Status::OK();
@@ -481,30 +496,37 @@ public:
                 _release_compression_ctx(std::move(context));
             }
         }};
-        Slice compressed_buf;
-        size_t max_len = max_compressed_len(input.size);
-        if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            // use output directly
-            output->resize(max_len);
-            compressed_buf.data = reinterpret_cast<char*>(output->data());
-            compressed_buf.size = max_len;
-        } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
-            compressed_buf.size = max_len;
-        }
 
-        size_t compressed_len = LZ4_compress_HC_continue(
-                context->ctx, input.data, compressed_buf.data, input.size, 
compressed_buf.size);
-        if (compressed_len == 0) {
-            compress_failed = true;
-            return Status::InvalidArgument("Output buffer's capacity is not 
enough, size={}",
-                                           compressed_buf.size);
-        }
-        output->resize(compressed_len);
-        if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), 
compressed_len);
+        try {
+            Slice compressed_buf;
+            size_t max_len = max_compressed_len(input.size);
+            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                // use output directly
+                output->resize(max_len);
+                compressed_buf.data = reinterpret_cast<char*>(output->data());
+                compressed_buf.size = max_len;
+            } else {
+                context->buffer->resize(max_len);
+                compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
+                compressed_buf.size = max_len;
+            }
+
+            size_t compressed_len = LZ4_compress_HC_continue(
+                    context->ctx, input.data, compressed_buf.data, input.size, 
compressed_buf.size);
+            if (compressed_len == 0) {
+                compress_failed = true;
+                return Status::InvalidArgument("Output buffer's capacity is 
not enough, size={}",
+                                               compressed_buf.size);
+            }
+            output->resize(compressed_len);
+            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
+                                    compressed_len);
+            }
+        } catch (...) {
+            // Do not set compress_failed to release context
+            DCHECK(!compress_failed);
+            return Status::InternalError("Fail to do LZ4HC compress due to 
exception");
         }
         return Status::OK();
     }
@@ -798,67 +820,73 @@ public:
             }
         }};
 
-        size_t max_len = max_compressed_len(uncompressed_size);
-        Slice compressed_buf;
-        if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            // use output directly
-            output->resize(max_len);
-            compressed_buf.data = reinterpret_cast<char*>(output->data());
-            compressed_buf.size = max_len;
-        } else {
-            // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
-            context->buffer.resize(max_len);
-            compressed_buf.data = 
reinterpret_cast<char*>(context->buffer.data());
-            compressed_buf.size = max_len;
-        }
+        try {
+            size_t max_len = max_compressed_len(uncompressed_size);
+            Slice compressed_buf;
+            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                // use output directly
+                output->resize(max_len);
+                compressed_buf.data = reinterpret_cast<char*>(output->data());
+                compressed_buf.size = max_len;
+            } else {
+                // reuse context buffer if max_len <= 
MAX_COMPRESSION_BUFFER_FOR_REUSE
+                context->buffer->resize(max_len);
+                compressed_buf.data = 
reinterpret_cast<char*>(context->buffer->data());
+                compressed_buf.size = max_len;
+            }
 
-        // set compression level to default 3
-        auto ret =
-                ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, 
ZSTD_CLEVEL_DEFAULT);
-        if (ZSTD_isError(ret)) {
-            return Status::InvalidArgument("ZSTD_CCtx_setParameter compression 
level error: {}",
-                                           
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
-        }
-        // set checksum flag to 1
-        ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
-        if (ZSTD_isError(ret)) {
-            return Status::InvalidArgument("ZSTD_CCtx_setParameter 
checksumFlag error: {}",
-                                           
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
-        }
+            // set compression level to default 3
+            auto ret = ZSTD_CCtx_setParameter(context->ctx, 
ZSTD_c_compressionLevel,
+                                              ZSTD_CLEVEL_DEFAULT);
+            if (ZSTD_isError(ret)) {
+                return Status::InvalidArgument("ZSTD_CCtx_setParameter 
compression level error: {}",
+                                               
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+            }
+            // set checksum flag to 1
+            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
+            if (ZSTD_isError(ret)) {
+                return Status::InvalidArgument("ZSTD_CCtx_setParameter 
checksumFlag error: {}",
+                                               
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+            }
 
-        ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
+            ZSTD_outBuffer out_buf = {compressed_buf.data, 
compressed_buf.size, 0};
 
-        for (size_t i = 0; i < inputs.size(); i++) {
-            ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+            for (size_t i = 0; i < inputs.size(); i++) {
+                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
 
-            bool last_input = (i == inputs.size() - 1);
-            auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+                bool last_input = (i == inputs.size() - 1);
+                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
 
-            bool finished = false;
-            do {
-                // do compress
-                auto ret = ZSTD_compressStream2(context->ctx, &out_buf, 
&in_buf, mode);
+                bool finished = false;
+                do {
+                    // do compress
+                    auto ret = ZSTD_compressStream2(context->ctx, &out_buf, 
&in_buf, mode);
 
-                if (ZSTD_isError(ret)) {
-                    compress_failed = true;
-                    return Status::InvalidArgument("ZSTD_compressStream2 
error: {}",
-                                                   
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
-                }
+                    if (ZSTD_isError(ret)) {
+                        compress_failed = true;
+                        return Status::InvalidArgument("ZSTD_compressStream2 
error: {}",
+                                                       
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+                    }
 
-                // ret is ZSTD hint for needed output buffer size
-                if (ret > 0 && out_buf.pos == out_buf.size) {
-                    compress_failed = true;
-                    return Status::InvalidArgument("ZSTD_compressStream2 
output buffer full");
-                }
+                    // ret is ZSTD hint for needed output buffer size
+                    if (ret > 0 && out_buf.pos == out_buf.size) {
+                        compress_failed = true;
+                        return Status::InvalidArgument("ZSTD_compressStream2 
output buffer full");
+                    }
 
-                finished = last_input ? (ret == 0) : (in_buf.pos == 
inputs[i].size);
-            } while (!finished);
-        }
+                    finished = last_input ? (ret == 0) : (in_buf.pos == 
inputs[i].size);
+                } while (!finished);
+            }
 
-        // set compressed size for caller
-        output->resize(out_buf.pos);
-        if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-            
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), 
out_buf.pos);
+            // set compressed size for caller
+            output->resize(out_buf.pos);
+            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+                
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), 
out_buf.pos);
+            }
+        } catch (...) {
+            // Do not set compress_failed to release context
+            DCHECK(!compress_failed);
+            return Status::InternalError("Fail to do ZSTD compress due to 
exception");
         }
 
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to