This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0f550aeda7c [fix](compression) handle exception to reuse compression
context (#35338) (#35380)
0f550aeda7c is described below
commit 0f550aeda7c1798673512e0243dec81a51ffab80
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri May 24 19:56:27 2024 +0800
[fix](compression) handle exception to reuse compression context (#35338)
(#35380)
* [fix](compression) handle exception to reuse compression context
Otherwise, there is memleak and new context is allocated, then flush tlb
consumes a lot sys cpu.
---
be/src/util/block_compression.cpp | 302 +++++++++++++++++++++-----------------
1 file changed, 165 insertions(+), 137 deletions(-)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index 332a581f3ec..3dd0d92c944 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -128,31 +128,39 @@ public:
_release_compression_ctx(std::move(context));
}
}};
- Slice compressed_buf;
- size_t max_len = max_compressed_len(input.size);
- if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
- // use output directly
- output->resize(max_len);
- compressed_buf.data = reinterpret_cast<char*>(output->data());
- compressed_buf.size = max_len;
- } else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
- compressed_buf.size = max_len;
- }
- size_t compressed_len =
- LZ4_compress_fast_continue(context->ctx, input.data,
compressed_buf.data,
- input.size, compressed_buf.size,
ACCELARATION);
- if (compressed_len == 0) {
- compress_failed = true;
- return Status::InvalidArgument("Output buffer's capacity is not
enough, size={}",
- compressed_buf.size);
- }
- output->resize(compressed_len);
- if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
compressed_len);
+ try {
+ Slice compressed_buf;
+ size_t max_len = max_compressed_len(input.size);
+ if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+ // use output directly
+ output->resize(max_len);
+ compressed_buf.data = reinterpret_cast<char*>(output->data());
+ compressed_buf.size = max_len;
+ } else {
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
+ compressed_buf.size = max_len;
+ }
+
+ size_t compressed_len =
+ LZ4_compress_fast_continue(context->ctx, input.data,
compressed_buf.data,
+ input.size,
compressed_buf.size, ACCELARATION);
+ if (compressed_len == 0) {
+ compress_failed = true;
+ return Status::InvalidArgument("Output buffer's capacity is
not enough, size={}",
+ compressed_buf.size);
+ }
+ output->resize(compressed_len);
+ if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
+ compressed_len);
+ }
+ } catch (...) {
+ // Do not set compress_failed to release context
+ DCHECK(!compress_failed);
+ return Status::InternalError("Fail to do LZ4Block compress due to
exception");
}
return Status::OK();
}
@@ -296,50 +304,57 @@ private:
_release_compression_ctx(std::move(context));
}
}};
- Slice compressed_buf;
- size_t max_len = max_compressed_len(uncompressed_size);
- if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
- // use output directly
- output->resize(max_len);
- compressed_buf.data = reinterpret_cast<char*>(output->data());
- compressed_buf.size = max_len;
- } else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
- compressed_buf.size = max_len;
- }
- auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data,
compressed_buf.size,
- &_s_preferences);
- if (LZ4F_isError(wbytes)) {
- compress_failed = true;
- return Status::InvalidArgument("Fail to do LZ4F compress begin,
res={}",
- LZ4F_getErrorName(wbytes));
- }
- size_t offset = wbytes;
- for (auto input : inputs) {
- wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data +
offset,
- compressed_buf.size - offset,
input.data, input.size,
- nullptr);
+ try {
+ Slice compressed_buf;
+ size_t max_len = max_compressed_len(uncompressed_size);
+ if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+ // use output directly
+ output->resize(max_len);
+ compressed_buf.data = reinterpret_cast<char*>(output->data());
+ compressed_buf.size = max_len;
+ } else {
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
+ compressed_buf.size = max_len;
+ }
+
+ auto wbytes = LZ4F_compressBegin(context->ctx,
compressed_buf.data, compressed_buf.size,
+ &_s_preferences);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
- return Status::InvalidArgument("Fail to do LZ4F compress
update, res={}",
+ return Status::InvalidArgument("Fail to do LZ4F compress
begin, res={}",
+ LZ4F_getErrorName(wbytes));
+ }
+ size_t offset = wbytes;
+ for (auto input : inputs) {
+ wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data
+ offset,
+ compressed_buf.size - offset,
input.data, input.size,
+ nullptr);
+ if (LZ4F_isError(wbytes)) {
+ compress_failed = true;
+ return Status::InvalidArgument("Fail to do LZ4F compress
update, res={}",
+ LZ4F_getErrorName(wbytes));
+ }
+ offset += wbytes;
+ }
+ wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data +
offset,
+ compressed_buf.size - offset, nullptr);
+ if (LZ4F_isError(wbytes)) {
+ compress_failed = true;
+ return Status::InvalidArgument("Fail to do LZ4F compress end,
res={}",
LZ4F_getErrorName(wbytes));
}
offset += wbytes;
- }
- wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
- compressed_buf.size - offset, nullptr);
- if (LZ4F_isError(wbytes)) {
- compress_failed = true;
- return Status::InvalidArgument("Fail to do LZ4F compress end,
res={}",
- LZ4F_getErrorName(wbytes));
- }
- offset += wbytes;
- output->resize(offset);
- if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
+ output->resize(offset);
+ if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
+ }
+ } catch (...) {
+ // Do not set compress_failed to release context
+ DCHECK(!compress_failed);
+ return Status::InternalError("Fail to do LZ4F compress due to
exception");
}
return Status::OK();
@@ -481,30 +496,37 @@ public:
_release_compression_ctx(std::move(context));
}
}};
- Slice compressed_buf;
- size_t max_len = max_compressed_len(input.size);
- if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
- // use output directly
- output->resize(max_len);
- compressed_buf.data = reinterpret_cast<char*>(output->data());
- compressed_buf.size = max_len;
- } else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
- compressed_buf.size = max_len;
- }
- size_t compressed_len = LZ4_compress_HC_continue(
- context->ctx, input.data, compressed_buf.data, input.size,
compressed_buf.size);
- if (compressed_len == 0) {
- compress_failed = true;
- return Status::InvalidArgument("Output buffer's capacity is not
enough, size={}",
- compressed_buf.size);
- }
- output->resize(compressed_len);
- if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
compressed_len);
+ try {
+ Slice compressed_buf;
+ size_t max_len = max_compressed_len(input.size);
+ if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+ // use output directly
+ output->resize(max_len);
+ compressed_buf.data = reinterpret_cast<char*>(output->data());
+ compressed_buf.size = max_len;
+ } else {
+ context->buffer->resize(max_len);
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
+ compressed_buf.size = max_len;
+ }
+
+ size_t compressed_len = LZ4_compress_HC_continue(
+ context->ctx, input.data, compressed_buf.data, input.size,
compressed_buf.size);
+ if (compressed_len == 0) {
+ compress_failed = true;
+ return Status::InvalidArgument("Output buffer's capacity is
not enough, size={}",
+ compressed_buf.size);
+ }
+ output->resize(compressed_len);
+ if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
+ compressed_len);
+ }
+ } catch (...) {
+ // Do not set compress_failed to release context
+ DCHECK(!compress_failed);
+ return Status::InternalError("Fail to do LZ4HC compress due to
exception");
}
return Status::OK();
}
@@ -798,67 +820,73 @@ public:
}
}};
- size_t max_len = max_compressed_len(uncompressed_size);
- Slice compressed_buf;
- if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
- // use output directly
- output->resize(max_len);
- compressed_buf.data = reinterpret_cast<char*>(output->data());
- compressed_buf.size = max_len;
- } else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
- compressed_buf.size = max_len;
- }
+ try {
+ size_t max_len = max_compressed_len(uncompressed_size);
+ Slice compressed_buf;
+ if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+ // use output directly
+ output->resize(max_len);
+ compressed_buf.data = reinterpret_cast<char*>(output->data());
+ compressed_buf.size = max_len;
+ } else {
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
+ compressed_buf.size = max_len;
+ }
- // set compression level to default 3
- auto ret =
- ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
ZSTD_CLEVEL_DEFAULT);
- if (ZSTD_isError(ret)) {
- return Status::InvalidArgument("ZSTD_CCtx_setParameter compression
level error: {}",
-
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
- }
- // set checksum flag to 1
- ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
- if (ZSTD_isError(ret)) {
- return Status::InvalidArgument("ZSTD_CCtx_setParameter
checksumFlag error: {}",
-
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
- }
+ // set compression level to default 3
+ auto ret = ZSTD_CCtx_setParameter(context->ctx,
ZSTD_c_compressionLevel,
+ ZSTD_CLEVEL_DEFAULT);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument("ZSTD_CCtx_setParameter
compression level error: {}",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+ }
+ // set checksum flag to 1
+ ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
+ if (ZSTD_isError(ret)) {
+ return Status::InvalidArgument("ZSTD_CCtx_setParameter
checksumFlag error: {}",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+ }
- ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
+ ZSTD_outBuffer out_buf = {compressed_buf.data,
compressed_buf.size, 0};
- for (size_t i = 0; i < inputs.size(); i++) {
- ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+ for (size_t i = 0; i < inputs.size(); i++) {
+ ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
- bool last_input = (i == inputs.size() - 1);
- auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+ bool last_input = (i == inputs.size() - 1);
+ auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
- bool finished = false;
- do {
- // do compress
- auto ret = ZSTD_compressStream2(context->ctx, &out_buf,
&in_buf, mode);
+ bool finished = false;
+ do {
+ // do compress
+ auto ret = ZSTD_compressStream2(context->ctx, &out_buf,
&in_buf, mode);
- if (ZSTD_isError(ret)) {
- compress_failed = true;
- return Status::InvalidArgument("ZSTD_compressStream2
error: {}",
-
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
- }
+ if (ZSTD_isError(ret)) {
+ compress_failed = true;
+ return Status::InvalidArgument("ZSTD_compressStream2
error: {}",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+ }
- // ret is ZSTD hint for needed output buffer size
- if (ret > 0 && out_buf.pos == out_buf.size) {
- compress_failed = true;
- return Status::InvalidArgument("ZSTD_compressStream2
output buffer full");
- }
+ // ret is ZSTD hint for needed output buffer size
+ if (ret > 0 && out_buf.pos == out_buf.size) {
+ compress_failed = true;
+ return Status::InvalidArgument("ZSTD_compressStream2
output buffer full");
+ }
- finished = last_input ? (ret == 0) : (in_buf.pos ==
inputs[i].size);
- } while (!finished);
- }
+ finished = last_input ? (ret == 0) : (in_buf.pos ==
inputs[i].size);
+ } while (!finished);
+ }
- // set compressed size for caller
- output->resize(out_buf.pos);
- if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
-
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
out_buf.pos);
+ // set compressed size for caller
+ output->resize(out_buf.pos);
+ if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
+
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
out_buf.pos);
+ }
+ } catch (...) {
+ // Do not set compress_failed to release context
+ DCHECK(!compress_failed);
+ return Status::InternalError("Fail to do ZSTD compress due to
exception");
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]