This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 41f29cf4cdb [fix](decompress)(review) context leaked in failure path 
(#33622) (#35364)
41f29cf4cdb is described below

commit 41f29cf4cdbf64c1cc14892ebb88d20c2dca5c0b
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri May 24 17:40:13 2024 +0800

    [fix](decompress)(review) context leaked in failure path (#33622) (#35364)
    
    * [fix](decompress)(review) context leaked in failure path
    
    * [fix](decompress)(review) context leaked in failure path review fix
    
    Co-authored-by: Vallish Pai <[email protected]>
---
 be/src/util/block_compression.cpp | 297 ++++++++++++++++++--------------------
 1 file changed, 143 insertions(+), 154 deletions(-)

diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index b5e800a7d02..332a581f3ec 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -44,6 +44,7 @@
 #include <ostream>
 
 #include "common/config.h"
+#include "common/factory_creator.h"
 #include "exec/decompressor.h"
 #include "gutil/endian.h"
 #include "gutil/strings/substitute.h"
@@ -90,10 +91,18 @@ bool BlockCompressionCodec::exceed_max_compress_len(size_t 
uncompressed_size) {
 
 class Lz4BlockCompression : public BlockCompressionCodec {
 private:
-    struct Context {
+    class Context {
+        ENABLE_FACTORY_CREATOR(Context);
+
+    public:
         Context() : ctx(nullptr) {}
         LZ4_stream_t* ctx;
         faststring buffer;
+        ~Context() {
+            if (ctx) {
+                LZ4_freeStream(ctx);
+            }
+        }
     };
 
 public:
@@ -101,11 +110,7 @@ public:
         static Lz4BlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4BlockCompression() {
-        for (auto ctx : _ctx_pool) {
-            _delete_compression_ctx(ctx);
-        }
-    }
+    ~Lz4BlockCompression() { _ctx_pool.clear(); }
 
     Status compress(const Slice& input, faststring* output) override {
         if (input.size > INT_MAX) {
@@ -115,14 +120,12 @@ public:
                     input.size);
         }
 
-        Context* context;
-        RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+        std::unique_ptr<Context> context;
+        RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
         Defer defer {[&] {
-            if (compress_failed) {
-                _delete_compression_ctx(context);
-            } else {
-                _release_compression_ctx(context);
+            if (!compress_failed) {
+                _release_compression_ctx(std::move(context));
             }
         }};
         Slice compressed_buf;
@@ -168,40 +171,34 @@ public:
 
 private:
     // reuse LZ4 compress stream
-    Status _acquire_compression_ctx(Context** out) {
+    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
         std::lock_guard<std::mutex> l(_ctx_mutex);
         if (_ctx_pool.empty()) {
-            Context* context = new (std::nothrow) Context();
-            if (context == nullptr) {
+            std::unique_ptr<Context> localCtx = Context::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("new LZ4 context error");
             }
-            context->ctx = LZ4_createStream();
-            if (context->ctx == nullptr) {
-                delete context;
+            localCtx->ctx = LZ4_createStream();
+            if (localCtx->ctx == nullptr) {
                 return Status::InvalidArgument("LZ4_createStream error");
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_pool.back();
+        out = std::move(_ctx_pool.back());
         _ctx_pool.pop_back();
         return Status::OK();
     }
-    void _release_compression_ctx(Context* context) {
+    void _release_compression_ctx(std::unique_ptr<Context> context) {
         DCHECK(context);
         LZ4_resetStream(context->ctx);
         std::lock_guard<std::mutex> l(_ctx_mutex);
-        _ctx_pool.push_back(context);
-    }
-    void _delete_compression_ctx(Context* context) {
-        DCHECK(context);
-        LZ4_freeStream(context->ctx);
-        delete context;
+        _ctx_pool.push_back(std::move(context));
     }
 
 private:
     mutable std::mutex _ctx_mutex;
-    mutable std::vector<Context*> _ctx_pool;
+    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
     static const int32_t ACCELARATION = 1;
 };
 
@@ -233,14 +230,30 @@ private:
 // Used for LZ4 frame format, decompress speed is two times faster than LZ4.
 class Lz4fBlockCompression : public BlockCompressionCodec {
 private:
-    struct CContext {
+    class CContext {
+        ENABLE_FACTORY_CREATOR(CContext);
+
+    public:
         CContext() : ctx(nullptr) {}
         LZ4F_compressionContext_t ctx;
         faststring buffer;
+        ~CContext() {
+            if (ctx) {
+                LZ4F_freeCompressionContext(ctx);
+            }
+        }
     };
-    struct DContext {
+    class DContext {
+        ENABLE_FACTORY_CREATOR(DContext);
+
+    public:
         DContext() : ctx(nullptr) {}
         LZ4F_decompressionContext_t ctx;
+        ~DContext() {
+            if (ctx) {
+                LZ4F_freeDecompressionContext(ctx);
+            }
+        }
     };
 
 public:
@@ -249,12 +262,8 @@ public:
         return &s_instance;
     }
     ~Lz4fBlockCompression() {
-        for (auto ctx : _ctx_c_pool) {
-            _delete_compression_ctx(ctx);
-        }
-        for (auto ctx : _ctx_d_pool) {
-            _delete_decompression_ctx(ctx);
-        }
+        _ctx_c_pool.clear();
+        _ctx_d_pool.clear();
     }
 
     Status compress(const Slice& input, faststring* output) override {
@@ -279,14 +288,12 @@ public:
 private:
     Status _compress(const std::vector<Slice>& inputs, size_t 
uncompressed_size,
                      faststring* output) {
-        CContext* context = nullptr;
-        RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+        std::unique_ptr<CContext> context;
+        RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
         Defer defer {[&] {
-            if (compress_failed) {
-                _delete_compression_ctx(context);
-            } else {
-                _release_compression_ctx(context);
+            if (!compress_failed) {
+                _release_compression_ctx(std::move(context));
             }
         }};
         Slice compressed_buf;
@@ -340,13 +347,11 @@ private:
 
     Status _decompress(const Slice& input, Slice* output) {
         bool decompress_failed = false;
-        DContext* context = nullptr;
-        RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
+        std::unique_ptr<DContext> context;
+        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
         Defer defer {[&] {
-            if (decompress_failed) {
-                _delete_decompression_ctx(context);
-            } else {
-                _release_decompression_ctx(context);
+            if (!decompress_failed) {
+                _release_decompression_ctx(std::move(context));
             }
         }};
         size_t input_size = input.size;
@@ -373,66 +378,56 @@ private:
 private:
     // acquire a compression ctx from pool, release while finish compress,
     // delete if compression failed
-    Status _acquire_compression_ctx(CContext** out) {
+    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
         std::lock_guard<std::mutex> l(_ctx_c_mutex);
         if (_ctx_c_pool.empty()) {
-            CContext* context = new (std::nothrow) CContext();
-            if (context == nullptr) {
+            std::unique_ptr<CContext> localCtx = CContext::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("failed to new LZ4F CContext");
             }
-            auto res = LZ4F_createCompressionContext(&context->ctx, 
LZ4F_VERSION);
+            auto res = LZ4F_createCompressionContext(&localCtx->ctx, 
LZ4F_VERSION);
             if (LZ4F_isError(res) != 0) {
                 return Status::InvalidArgument(strings::Substitute(
                         "LZ4F_createCompressionContext error, res=$0", 
LZ4F_getErrorName(res)));
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_c_pool.back();
+        out = std::move(_ctx_c_pool.back());
         _ctx_c_pool.pop_back();
         return Status::OK();
     }
-    void _release_compression_ctx(CContext* context) {
+    void _release_compression_ctx(std::unique_ptr<CContext> context) {
         DCHECK(context);
         std::lock_guard<std::mutex> l(_ctx_c_mutex);
-        _ctx_c_pool.push_back(context);
-    }
-    void _delete_compression_ctx(CContext* context) {
-        DCHECK(context);
-        LZ4F_freeCompressionContext(context->ctx);
-        delete context;
+        _ctx_c_pool.push_back(std::move(context));
     }
 
-    Status _acquire_decompression_ctx(DContext** out) {
+    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
         std::lock_guard<std::mutex> l(_ctx_d_mutex);
         if (_ctx_d_pool.empty()) {
-            DContext* context = new (std::nothrow) DContext();
-            if (context == nullptr) {
+            std::unique_ptr<DContext> localCtx = DContext::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("failed to new LZ4F DContext");
             }
-            auto res = LZ4F_createDecompressionContext(&context->ctx, 
LZ4F_VERSION);
+            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, 
LZ4F_VERSION);
             if (LZ4F_isError(res) != 0) {
                 return Status::InvalidArgument(strings::Substitute(
                         "LZ4F_createDeompressionContext error, res=$0", 
LZ4F_getErrorName(res)));
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_d_pool.back();
+        out = std::move(_ctx_d_pool.back());
         _ctx_d_pool.pop_back();
         return Status::OK();
     }
-    void _release_decompression_ctx(DContext* context) {
+    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
         DCHECK(context);
         // reset decompression context to avoid ERROR_maxBlockSize_invalid
         LZ4F_resetDecompressionContext(context->ctx);
         std::lock_guard<std::mutex> l(_ctx_d_mutex);
-        _ctx_d_pool.push_back(context);
-    }
-    void _delete_decompression_ctx(DContext* context) {
-        DCHECK(context);
-        LZ4F_freeDecompressionContext(context->ctx);
-        delete context;
+        _ctx_d_pool.push_back(std::move(context));
     }
 
 private:
@@ -440,10 +435,10 @@ private:
 
     std::mutex _ctx_c_mutex;
     // LZ4F_compressionContext_t is a pointer so no copy here
-    std::vector<CContext*> _ctx_c_pool;
+    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
 
     std::mutex _ctx_d_mutex;
-    std::vector<DContext*> _ctx_d_pool;
+    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
 };
 
 LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -456,10 +451,18 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = 
{
 
 class Lz4HCBlockCompression : public BlockCompressionCodec {
 private:
-    struct Context {
+    class Context {
+        ENABLE_FACTORY_CREATOR(Context);
+
+    public:
         Context() : ctx(nullptr) {}
         LZ4_streamHC_t* ctx;
         faststring buffer;
+        ~Context() {
+            if (ctx) {
+                LZ4_freeStreamHC(ctx);
+            }
+        }
     };
 
 public:
@@ -467,21 +470,15 @@ public:
         static Lz4HCBlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4HCBlockCompression() {
-        for (auto ctx : _ctx_pool) {
-            _delete_compression_ctx(ctx);
-        }
-    }
+    ~Lz4HCBlockCompression() { _ctx_pool.clear(); }
 
     Status compress(const Slice& input, faststring* output) override {
-        Context* context;
-        RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+        std::unique_ptr<Context> context;
+        RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
         Defer defer {[&] {
-            if (compress_failed) {
-                _delete_compression_ctx(context);
-            } else {
-                _release_compression_ctx(context);
+            if (!compress_failed) {
+                _release_compression_ctx(std::move(context));
             }
         }};
         Slice compressed_buf;
@@ -525,41 +522,35 @@ public:
     size_t max_compressed_len(size_t len) override { return 
LZ4_compressBound(len); }
 
 private:
-    Status _acquire_compression_ctx(Context** out) {
+    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
         std::lock_guard<std::mutex> l(_ctx_mutex);
         if (_ctx_pool.empty()) {
-            Context* context = new (std::nothrow) Context();
-            if (context == nullptr) {
+            std::unique_ptr<Context> localCtx = Context::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("new LZ4HC context error");
             }
-            context->ctx = LZ4_createStreamHC();
-            if (context->ctx == nullptr) {
-                delete context;
+            localCtx->ctx = LZ4_createStreamHC();
+            if (localCtx->ctx == nullptr) {
                 return Status::InvalidArgument("LZ4_createStreamHC error");
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_pool.back();
+        out = std::move(_ctx_pool.back());
         _ctx_pool.pop_back();
         return Status::OK();
     }
-    void _release_compression_ctx(Context* context) {
+    void _release_compression_ctx(std::unique_ptr<Context> context) {
         DCHECK(context);
         LZ4_resetStreamHC_fast(context->ctx, _compression_level);
         std::lock_guard<std::mutex> l(_ctx_mutex);
-        _ctx_pool.push_back(context);
-    }
-    void _delete_compression_ctx(Context* context) {
-        DCHECK(context);
-        LZ4_freeStreamHC(context->ctx);
-        delete context;
+        _ctx_pool.push_back(std::move(context));
     }
 
 private:
     int64_t _compression_level = config::LZ4_HC_compression_level;
     mutable std::mutex _ctx_mutex;
-    mutable std::vector<Context*> _ctx_pool;
+    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
 };
 
 class SnappySlicesSource : public snappy::Source {
@@ -751,14 +742,30 @@ public:
 // for ZSTD compression and decompression, with BOTH fast and high compression 
ratio
 class ZstdBlockCompression : public BlockCompressionCodec {
 private:
-    struct CContext {
+    class CContext {
+        ENABLE_FACTORY_CREATOR(CContext);
+
+    public:
         CContext() : ctx(nullptr) {}
         ZSTD_CCtx* ctx;
         faststring buffer;
+        ~CContext() {
+            if (ctx) {
+                ZSTD_freeCCtx(ctx);
+            }
+        }
     };
-    struct DContext {
+    class DContext {
+        ENABLE_FACTORY_CREATOR(DContext);
+
+    public:
         DContext() : ctx(nullptr) {}
         ZSTD_DCtx* ctx;
+        ~DContext() {
+            if (ctx) {
+                ZSTD_freeDCtx(ctx);
+            }
+        }
     };
 
 public:
@@ -767,12 +774,8 @@ public:
         return &s_instance;
     }
     ~ZstdBlockCompression() {
-        for (auto ctx : _ctx_c_pool) {
-            _delete_compression_ctx(ctx);
-        }
-        for (auto ctx : _ctx_d_pool) {
-            _delete_decompression_ctx(ctx);
-        }
+        _ctx_c_pool.clear();
+        _ctx_d_pool.clear();
     }
 
     size_t max_compressed_len(size_t len) override { return 
ZSTD_compressBound(len); }
@@ -786,14 +789,12 @@ public:
     //  
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
     Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
                     faststring* output) override {
-        CContext* context;
-        RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+        std::unique_ptr<CContext> context;
+        RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
         Defer defer {[&] {
-            if (compress_failed) {
-                _delete_compression_ctx(context);
-            } else {
-                _release_compression_ctx(context);
+            if (!compress_failed) {
+                _release_compression_ctx(std::move(context));
             }
         }};
 
@@ -864,14 +865,12 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        DContext* context;
+        std::unique_ptr<DContext> context;
         bool decompress_failed = false;
-        RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
+        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
         Defer defer {[&] {
-            if (decompress_failed) {
-                _delete_decompression_ctx(context);
-            } else {
-                _release_decompression_ctx(context);
+            if (!decompress_failed) {
+                _release_decompression_ctx(std::move(context));
             }
         }};
 
@@ -890,76 +889,66 @@ public:
     }
 
 private:
-    Status _acquire_compression_ctx(CContext** out) {
+    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
         std::lock_guard<std::mutex> l(_ctx_c_mutex);
         if (_ctx_c_pool.empty()) {
-            CContext* context = new (std::nothrow) CContext();
-            if (context == nullptr) {
+            std::unique_ptr<CContext> localCtx = CContext::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("failed to new ZSTD CContext");
             }
             //typedef LZ4F_cctx* LZ4F_compressionContext_t;
-            context->ctx = ZSTD_createCCtx();
-            if (context->ctx == nullptr) {
+            localCtx->ctx = ZSTD_createCCtx();
+            if (localCtx->ctx == nullptr) {
                 return Status::InvalidArgument("Failed to create ZSTD compress 
ctx");
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_c_pool.back();
+        out = std::move(_ctx_c_pool.back());
         _ctx_c_pool.pop_back();
         return Status::OK();
     }
-    void _release_compression_ctx(CContext* context) {
+    void _release_compression_ctx(std::unique_ptr<CContext> context) {
         DCHECK(context);
         auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
         DCHECK(!ZSTD_isError(ret));
         std::lock_guard<std::mutex> l(_ctx_c_mutex);
-        _ctx_c_pool.push_back(context);
-    }
-    void _delete_compression_ctx(CContext* context) {
-        DCHECK(context);
-        ZSTD_freeCCtx(context->ctx);
-        delete context;
+        _ctx_c_pool.push_back(std::move(context));
     }
 
-    Status _acquire_decompression_ctx(DContext** out) {
+    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
         std::lock_guard<std::mutex> l(_ctx_d_mutex);
         if (_ctx_d_pool.empty()) {
-            DContext* context = new (std::nothrow) DContext();
-            if (context == nullptr) {
+            std::unique_ptr<DContext> localCtx = DContext::create_unique();
+            if (localCtx.get() == nullptr) {
                 return Status::InvalidArgument("failed to new ZSTD DContext");
             }
-            context->ctx = ZSTD_createDCtx();
-            if (context->ctx == nullptr) {
+            localCtx->ctx = ZSTD_createDCtx();
+            if (localCtx->ctx == nullptr) {
                 return Status::InvalidArgument("Fail to init ZSTD decompress 
context");
             }
-            *out = context;
+            out = std::move(localCtx);
             return Status::OK();
         }
-        *out = _ctx_d_pool.back();
+        out = std::move(_ctx_d_pool.back());
         _ctx_d_pool.pop_back();
         return Status::OK();
     }
-    void _release_decompression_ctx(DContext* context) {
+    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
         DCHECK(context);
         // reset ctx to start a new decompress session
         auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
         DCHECK(!ZSTD_isError(ret));
         std::lock_guard<std::mutex> l(_ctx_d_mutex);
-        _ctx_d_pool.push_back(context);
-    }
-    void _delete_decompression_ctx(DContext* context) {
-        DCHECK(context);
-        ZSTD_freeDCtx(context->ctx);
-        delete context;
+        _ctx_d_pool.push_back(std::move(context));
     }
 
 private:
     mutable std::mutex _ctx_c_mutex;
-    mutable std::vector<CContext*> _ctx_c_pool;
+    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
 
     mutable std::mutex _ctx_d_mutex;
-    mutable std::vector<DContext*> _ctx_d_pool;
+    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
 };
 
 class GzipBlockCompression : public ZlibBlockCompression {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to