This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 41f29cf4cdb [fix](decompress)(review) context leaked in failure path
(#33622) (#35364)
41f29cf4cdb is described below
commit 41f29cf4cdbf64c1cc14892ebb88d20c2dca5c0b
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri May 24 17:40:13 2024 +0800
[fix](decompress)(review) context leaked in failure path (#33622) (#35364)
* [fix](decompress)(review) context leaked in failure path
* [fix](decompress)(review) context leaked in failure path review fix
Co-authored-by: Vallish Pai <[email protected]>
---
be/src/util/block_compression.cpp | 297 ++++++++++++++++++--------------------
1 file changed, 143 insertions(+), 154 deletions(-)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index b5e800a7d02..332a581f3ec 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -44,6 +44,7 @@
#include <ostream>
#include "common/config.h"
+#include "common/factory_creator.h"
#include "exec/decompressor.h"
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"
@@ -90,10 +91,18 @@ bool BlockCompressionCodec::exceed_max_compress_len(size_t
uncompressed_size) {
class Lz4BlockCompression : public BlockCompressionCodec {
private:
- struct Context {
+ class Context {
+ ENABLE_FACTORY_CREATOR(Context);
+
+ public:
Context() : ctx(nullptr) {}
LZ4_stream_t* ctx;
faststring buffer;
+ ~Context() {
+ if (ctx) {
+ LZ4_freeStream(ctx);
+ }
+ }
};
public:
@@ -101,11 +110,7 @@ public:
static Lz4BlockCompression s_instance;
return &s_instance;
}
- ~Lz4BlockCompression() {
- for (auto ctx : _ctx_pool) {
- _delete_compression_ctx(ctx);
- }
- }
+ ~Lz4BlockCompression() { _ctx_pool.clear(); }
Status compress(const Slice& input, faststring* output) override {
if (input.size > INT_MAX) {
@@ -115,14 +120,12 @@ public:
input.size);
}
- Context* context;
- RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+ std::unique_ptr<Context> context;
+ RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
- if (compress_failed) {
- _delete_compression_ctx(context);
- } else {
- _release_compression_ctx(context);
+ if (!compress_failed) {
+ _release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@@ -168,40 +171,34 @@ public:
private:
// reuse LZ4 compress stream
- Status _acquire_compression_ctx(Context** out) {
+ Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
std::lock_guard<std::mutex> l(_ctx_mutex);
if (_ctx_pool.empty()) {
- Context* context = new (std::nothrow) Context();
- if (context == nullptr) {
+ std::unique_ptr<Context> localCtx = Context::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("new LZ4 context error");
}
- context->ctx = LZ4_createStream();
- if (context->ctx == nullptr) {
- delete context;
+ localCtx->ctx = LZ4_createStream();
+ if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("LZ4_createStream error");
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_pool.back();
+ out = std::move(_ctx_pool.back());
_ctx_pool.pop_back();
return Status::OK();
}
- void _release_compression_ctx(Context* context) {
+ void _release_compression_ctx(std::unique_ptr<Context> context) {
DCHECK(context);
LZ4_resetStream(context->ctx);
std::lock_guard<std::mutex> l(_ctx_mutex);
- _ctx_pool.push_back(context);
- }
- void _delete_compression_ctx(Context* context) {
- DCHECK(context);
- LZ4_freeStream(context->ctx);
- delete context;
+ _ctx_pool.push_back(std::move(context));
}
private:
mutable std::mutex _ctx_mutex;
- mutable std::vector<Context*> _ctx_pool;
+ mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
static const int32_t ACCELARATION = 1;
};
@@ -233,14 +230,30 @@ private:
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
private:
- struct CContext {
+ class CContext {
+ ENABLE_FACTORY_CREATOR(CContext);
+
+ public:
CContext() : ctx(nullptr) {}
LZ4F_compressionContext_t ctx;
faststring buffer;
+ ~CContext() {
+ if (ctx) {
+ LZ4F_freeCompressionContext(ctx);
+ }
+ }
};
- struct DContext {
+ class DContext {
+ ENABLE_FACTORY_CREATOR(DContext);
+
+ public:
DContext() : ctx(nullptr) {}
LZ4F_decompressionContext_t ctx;
+ ~DContext() {
+ if (ctx) {
+ LZ4F_freeDecompressionContext(ctx);
+ }
+ }
};
public:
@@ -249,12 +262,8 @@ public:
return &s_instance;
}
~Lz4fBlockCompression() {
- for (auto ctx : _ctx_c_pool) {
- _delete_compression_ctx(ctx);
- }
- for (auto ctx : _ctx_d_pool) {
- _delete_decompression_ctx(ctx);
- }
+ _ctx_c_pool.clear();
+ _ctx_d_pool.clear();
}
Status compress(const Slice& input, faststring* output) override {
@@ -279,14 +288,12 @@ public:
private:
Status _compress(const std::vector<Slice>& inputs, size_t
uncompressed_size,
faststring* output) {
- CContext* context = nullptr;
- RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+ std::unique_ptr<CContext> context;
+ RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
- if (compress_failed) {
- _delete_compression_ctx(context);
- } else {
- _release_compression_ctx(context);
+ if (!compress_failed) {
+ _release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@@ -340,13 +347,11 @@ private:
Status _decompress(const Slice& input, Slice* output) {
bool decompress_failed = false;
- DContext* context = nullptr;
- RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
+ std::unique_ptr<DContext> context;
+ RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Defer defer {[&] {
- if (decompress_failed) {
- _delete_decompression_ctx(context);
- } else {
- _release_decompression_ctx(context);
+ if (!decompress_failed) {
+ _release_decompression_ctx(std::move(context));
}
}};
size_t input_size = input.size;
@@ -373,66 +378,56 @@ private:
private:
// acquire a compression ctx from pool, release while finish compress,
// delete if compression failed
- Status _acquire_compression_ctx(CContext** out) {
+ Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
std::lock_guard<std::mutex> l(_ctx_c_mutex);
if (_ctx_c_pool.empty()) {
- CContext* context = new (std::nothrow) CContext();
- if (context == nullptr) {
+ std::unique_ptr<CContext> localCtx = CContext::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new LZ4F CContext");
}
- auto res = LZ4F_createCompressionContext(&context->ctx,
LZ4F_VERSION);
+ auto res = LZ4F_createCompressionContext(&localCtx->ctx,
LZ4F_VERSION);
if (LZ4F_isError(res) != 0) {
return Status::InvalidArgument(strings::Substitute(
"LZ4F_createCompressionContext error, res=$0",
LZ4F_getErrorName(res)));
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_c_pool.back();
+ out = std::move(_ctx_c_pool.back());
_ctx_c_pool.pop_back();
return Status::OK();
}
- void _release_compression_ctx(CContext* context) {
+ void _release_compression_ctx(std::unique_ptr<CContext> context) {
DCHECK(context);
std::lock_guard<std::mutex> l(_ctx_c_mutex);
- _ctx_c_pool.push_back(context);
- }
- void _delete_compression_ctx(CContext* context) {
- DCHECK(context);
- LZ4F_freeCompressionContext(context->ctx);
- delete context;
+ _ctx_c_pool.push_back(std::move(context));
}
- Status _acquire_decompression_ctx(DContext** out) {
+ Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
std::lock_guard<std::mutex> l(_ctx_d_mutex);
if (_ctx_d_pool.empty()) {
- DContext* context = new (std::nothrow) DContext();
- if (context == nullptr) {
+ std::unique_ptr<DContext> localCtx = DContext::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new LZ4F DContext");
}
- auto res = LZ4F_createDecompressionContext(&context->ctx,
LZ4F_VERSION);
+ auto res = LZ4F_createDecompressionContext(&localCtx->ctx,
LZ4F_VERSION);
if (LZ4F_isError(res) != 0) {
return Status::InvalidArgument(strings::Substitute(
"LZ4F_createDeompressionContext error, res=$0",
LZ4F_getErrorName(res)));
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_d_pool.back();
+ out = std::move(_ctx_d_pool.back());
_ctx_d_pool.pop_back();
return Status::OK();
}
- void _release_decompression_ctx(DContext* context) {
+ void _release_decompression_ctx(std::unique_ptr<DContext> context) {
DCHECK(context);
// reset decompression context to avoid ERROR_maxBlockSize_invalid
LZ4F_resetDecompressionContext(context->ctx);
std::lock_guard<std::mutex> l(_ctx_d_mutex);
- _ctx_d_pool.push_back(context);
- }
- void _delete_decompression_ctx(DContext* context) {
- DCHECK(context);
- LZ4F_freeDecompressionContext(context->ctx);
- delete context;
+ _ctx_d_pool.push_back(std::move(context));
}
private:
@@ -440,10 +435,10 @@ private:
std::mutex _ctx_c_mutex;
// LZ4F_compressionContext_t is a pointer so no copy here
- std::vector<CContext*> _ctx_c_pool;
+ std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
std::mutex _ctx_d_mutex;
- std::vector<DContext*> _ctx_d_pool;
+ std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
};
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
@@ -456,10 +451,18 @@ LZ4F_preferences_t Lz4fBlockCompression::_s_preferences =
{
class Lz4HCBlockCompression : public BlockCompressionCodec {
private:
- struct Context {
+ class Context {
+ ENABLE_FACTORY_CREATOR(Context);
+
+ public:
Context() : ctx(nullptr) {}
LZ4_streamHC_t* ctx;
faststring buffer;
+ ~Context() {
+ if (ctx) {
+ LZ4_freeStreamHC(ctx);
+ }
+ }
};
public:
@@ -467,21 +470,15 @@ public:
static Lz4HCBlockCompression s_instance;
return &s_instance;
}
- ~Lz4HCBlockCompression() {
- for (auto ctx : _ctx_pool) {
- _delete_compression_ctx(ctx);
- }
- }
+ ~Lz4HCBlockCompression() { _ctx_pool.clear(); }
Status compress(const Slice& input, faststring* output) override {
- Context* context;
- RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+ std::unique_ptr<Context> context;
+ RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
- if (compress_failed) {
- _delete_compression_ctx(context);
- } else {
- _release_compression_ctx(context);
+ if (!compress_failed) {
+ _release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
@@ -525,41 +522,35 @@ public:
size_t max_compressed_len(size_t len) override { return
LZ4_compressBound(len); }
private:
- Status _acquire_compression_ctx(Context** out) {
+ Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
std::lock_guard<std::mutex> l(_ctx_mutex);
if (_ctx_pool.empty()) {
- Context* context = new (std::nothrow) Context();
- if (context == nullptr) {
+ std::unique_ptr<Context> localCtx = Context::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("new LZ4HC context error");
}
- context->ctx = LZ4_createStreamHC();
- if (context->ctx == nullptr) {
- delete context;
+ localCtx->ctx = LZ4_createStreamHC();
+ if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("LZ4_createStreamHC error");
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_pool.back();
+ out = std::move(_ctx_pool.back());
_ctx_pool.pop_back();
return Status::OK();
}
- void _release_compression_ctx(Context* context) {
+ void _release_compression_ctx(std::unique_ptr<Context> context) {
DCHECK(context);
LZ4_resetStreamHC_fast(context->ctx, _compression_level);
std::lock_guard<std::mutex> l(_ctx_mutex);
- _ctx_pool.push_back(context);
- }
- void _delete_compression_ctx(Context* context) {
- DCHECK(context);
- LZ4_freeStreamHC(context->ctx);
- delete context;
+ _ctx_pool.push_back(std::move(context));
}
private:
int64_t _compression_level = config::LZ4_HC_compression_level;
mutable std::mutex _ctx_mutex;
- mutable std::vector<Context*> _ctx_pool;
+ mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
};
class SnappySlicesSource : public snappy::Source {
@@ -751,14 +742,30 @@ public:
// for ZSTD compression and decompression, with BOTH fast and high compression
ratio
class ZstdBlockCompression : public BlockCompressionCodec {
private:
- struct CContext {
+ class CContext {
+ ENABLE_FACTORY_CREATOR(CContext);
+
+ public:
CContext() : ctx(nullptr) {}
ZSTD_CCtx* ctx;
faststring buffer;
+ ~CContext() {
+ if (ctx) {
+ ZSTD_freeCCtx(ctx);
+ }
+ }
};
- struct DContext {
+ class DContext {
+ ENABLE_FACTORY_CREATOR(DContext);
+
+ public:
DContext() : ctx(nullptr) {}
ZSTD_DCtx* ctx;
+ ~DContext() {
+ if (ctx) {
+ ZSTD_freeDCtx(ctx);
+ }
+ }
};
public:
@@ -767,12 +774,8 @@ public:
return &s_instance;
}
~ZstdBlockCompression() {
- for (auto ctx : _ctx_c_pool) {
- _delete_compression_ctx(ctx);
- }
- for (auto ctx : _ctx_d_pool) {
- _delete_decompression_ctx(ctx);
- }
+ _ctx_c_pool.clear();
+ _ctx_d_pool.clear();
}
size_t max_compressed_len(size_t len) override { return
ZSTD_compressBound(len); }
@@ -786,14 +789,12 @@ public:
//
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
- CContext* context;
- RETURN_IF_ERROR(_acquire_compression_ctx(&context));
+ std::unique_ptr<CContext> context;
+ RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
Defer defer {[&] {
- if (compress_failed) {
- _delete_compression_ctx(context);
- } else {
- _release_compression_ctx(context);
+ if (!compress_failed) {
+ _release_compression_ctx(std::move(context));
}
}};
@@ -864,14 +865,12 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- DContext* context;
+ std::unique_ptr<DContext> context;
bool decompress_failed = false;
- RETURN_IF_ERROR(_acquire_decompression_ctx(&context));
+ RETURN_IF_ERROR(_acquire_decompression_ctx(context));
Defer defer {[&] {
- if (decompress_failed) {
- _delete_decompression_ctx(context);
- } else {
- _release_decompression_ctx(context);
+ if (!decompress_failed) {
+ _release_decompression_ctx(std::move(context));
}
}};
@@ -890,76 +889,66 @@ public:
}
private:
- Status _acquire_compression_ctx(CContext** out) {
+ Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
std::lock_guard<std::mutex> l(_ctx_c_mutex);
if (_ctx_c_pool.empty()) {
- CContext* context = new (std::nothrow) CContext();
- if (context == nullptr) {
+ std::unique_ptr<CContext> localCtx = CContext::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new ZSTD CContext");
}
//typedef LZ4F_cctx* LZ4F_compressionContext_t;
- context->ctx = ZSTD_createCCtx();
- if (context->ctx == nullptr) {
+ localCtx->ctx = ZSTD_createCCtx();
+ if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("Failed to create ZSTD compress
ctx");
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_c_pool.back();
+ out = std::move(_ctx_c_pool.back());
_ctx_c_pool.pop_back();
return Status::OK();
}
- void _release_compression_ctx(CContext* context) {
+ void _release_compression_ctx(std::unique_ptr<CContext> context) {
DCHECK(context);
auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
DCHECK(!ZSTD_isError(ret));
std::lock_guard<std::mutex> l(_ctx_c_mutex);
- _ctx_c_pool.push_back(context);
- }
- void _delete_compression_ctx(CContext* context) {
- DCHECK(context);
- ZSTD_freeCCtx(context->ctx);
- delete context;
+ _ctx_c_pool.push_back(std::move(context));
}
- Status _acquire_decompression_ctx(DContext** out) {
+ Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
std::lock_guard<std::mutex> l(_ctx_d_mutex);
if (_ctx_d_pool.empty()) {
- DContext* context = new (std::nothrow) DContext();
- if (context == nullptr) {
+ std::unique_ptr<DContext> localCtx = DContext::create_unique();
+ if (localCtx.get() == nullptr) {
return Status::InvalidArgument("failed to new ZSTD DContext");
}
- context->ctx = ZSTD_createDCtx();
- if (context->ctx == nullptr) {
+ localCtx->ctx = ZSTD_createDCtx();
+ if (localCtx->ctx == nullptr) {
return Status::InvalidArgument("Fail to init ZSTD decompress
context");
}
- *out = context;
+ out = std::move(localCtx);
return Status::OK();
}
- *out = _ctx_d_pool.back();
+ out = std::move(_ctx_d_pool.back());
_ctx_d_pool.pop_back();
return Status::OK();
}
- void _release_decompression_ctx(DContext* context) {
+ void _release_decompression_ctx(std::unique_ptr<DContext> context) {
DCHECK(context);
// reset ctx to start a new decompress session
auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
DCHECK(!ZSTD_isError(ret));
std::lock_guard<std::mutex> l(_ctx_d_mutex);
- _ctx_d_pool.push_back(context);
- }
- void _delete_decompression_ctx(DContext* context) {
- DCHECK(context);
- ZSTD_freeDCtx(context->ctx);
- delete context;
+ _ctx_d_pool.push_back(std::move(context));
}
private:
mutable std::mutex _ctx_c_mutex;
- mutable std::vector<CContext*> _ctx_c_pool;
+ mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
mutable std::mutex _ctx_d_mutex;
- mutable std::vector<DContext*> _ctx_d_pool;
+ mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
};
class GzipBlockCompression : public ZlibBlockCompression {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]