This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b494700ab2 [improvement](status) Change the return type for
block_compression (#47566)
0b494700ab2 is described below
commit 0b494700ab2f50d931a00c038ace996e93cad5f0
Author: lzyy2024 <[email protected]>
AuthorDate: Sat Feb 8 17:16:58 2025 +0800
[improvement](status) Change the return type for block_compression (#47566)
### What problem does this PR solve?
The previous block_compression.cpp didn't make sense for the type of
Status returned, so I changed it
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
In the past, a function returned multiple states, perhaps directly
returning one state. I made different treatments according to different
states
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/util/block_compression.cpp | 130 ++++++++++++++++++++---------
be/src/vec/functions/function_compress.cpp | 2 +-
2 files changed, 93 insertions(+), 39 deletions(-)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index 7a0aacd4252..fcb2f3fae08 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -187,7 +187,7 @@ public:
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size,
output->size);
if (decompressed_len < 0) {
- return Status::InvalidArgument("fail to do LZ4 decompress,
error={}", decompressed_len);
+ return Status::InternalError("fail to do LZ4 decompress,
error={}", decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
@@ -458,8 +458,8 @@ private:
&input_size, nullptr);
if (LZ4F_isError(lres)) {
decompress_failed = true;
- return Status::InvalidArgument("Fail to do LZ4F decompress,
res={}",
- LZ4F_getErrorName(lres));
+ return Status::InternalError("Fail to do LZ4F decompress, res={}",
+ LZ4F_getErrorName(lres));
} else if (input_size != input.size) {
decompress_failed = true;
return Status::InvalidArgument(
@@ -635,7 +635,10 @@ public:
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size,
output->size);
if (decompressed_len < 0) {
- return Status::InvalidArgument("fail to do LZ4 decompress,
error={}", decompressed_len);
+ return Status::InvalidArgument(
+ "destination buffer is not large enough or the source
stream is detected "
+ "malformed, fail to do LZ4 decompress, error={}",
+ decompressed_len);
}
output->size = decompressed_len;
return Status::OK();
@@ -854,8 +857,12 @@ public:
Slice s(*output);
auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data,
input.size);
- if (zres != Z_OK) {
- return Status::InvalidArgument("Fail to do ZLib compress,
error={}", zError(zres));
+ if (zres == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded(fmt::format(
+ "ZLib compression failed due to memory
allocationerror.error = {}, res = {} ",
+ zError(zres), zres)));
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to do Zlib compress, error={}",
zError(zres));
}
output->resize(s.size);
return Status::OK();
@@ -871,9 +878,12 @@ public:
zstrm.zfree = Z_NULL;
zstrm.opaque = Z_NULL;
auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
- if (zres != Z_OK) {
- return Status::InvalidArgument("Fail to do ZLib stream compress,
error={}, res={}",
- zError(zres), zres);
+ if (zres == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded(
+ "Fail to do ZLib stream compress, error={}, res={}",
zError(zres), zres));
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(zres), zres);
}
// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
@@ -888,16 +898,19 @@ public:
zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
- return Status::InvalidArgument("Fail to do ZLib stream
compress, error={}, res={}",
- zError(zres), zres);
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(zres), zres);
}
}
output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
- if (zres != Z_OK) {
- return Status::InvalidArgument("Fail to do deflateEnd on ZLib
stream, error={}, res={}",
- zError(zres), zres);
+ if (zres == Z_DATA_ERROR) {
+ return Status::InvalidArgument("Fail to do deflateEnd, error={},
res={}", zError(zres),
+ zres);
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to do deflateEnd on ZLib
stream, error={}, res={}",
+ zError(zres), zres);
}
return Status::OK();
}
@@ -906,8 +919,13 @@ public:
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size,
(Bytef*)input.data, &input_size);
- if (zres != Z_OK) {
+ if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do ZLib decompress,
error={}", zError(zres));
+ } else if (zres == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib
decompress, error={}",
+ zError(zres)));
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to do ZLib decompress,
error={}", zError(zres));
}
return Status::OK();
}
@@ -932,8 +950,14 @@ public:
uint32_t size = output->size();
auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size,
(char*)input.data,
input.size, 9, 0, 0);
- if (bzres != BZ_OK) {
- return Status::InternalError("Fail to do Bzip2 compress, ret={}",
bzres);
+ if (bzres == BZ_MEM_ERROR) {
+ throw Exception(
+ Status::MemoryLimitExceeded("Fail to do Bzip2 compress,
ret={}", bzres));
+ } else if (bzres == BZ_PARAM_ERROR) {
+ return Status::InvalidArgument("Fail to do Bzip2 compress,
ret={}", bzres);
+ } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres !=
BZ_FINISH_OK &&
+ bzres != BZ_STREAM_END && bzres != BZ_OK) {
+ return Status::InternalError("Failed to init bz2. status code:
{}", bzres);
}
output->resize(size);
return Status::OK();
@@ -947,7 +971,12 @@ public:
bz_stream bzstrm;
bzero(&bzstrm, sizeof(bzstrm));
int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
- if (bzres != BZ_OK) {
+ if (bzres == BZ_PARAM_ERROR) {
+ return Status::InvalidArgument("Failed to init bz2. status code:
{}", bzres);
+ } else if (bzres == BZ_MEM_ERROR) {
+ throw Exception(
+ Status::MemoryLimitExceeded("Failed to init bz2. status
code: {}", bzres));
+ } else if (bzres != BZ_OK) {
return Status::InternalError("Failed to init bz2. status code:
{}", bzres);
}
// we assume that output is e
@@ -962,15 +991,20 @@ public:
int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
bzres = BZ2_bzCompress(&bzstrm, flush);
- if (bzres != BZ_OK && bzres != BZ_STREAM_END) {
- return Status::InternalError("Fail to do bzip2 stream
compress, res={}", bzres);
+ if (bzres == BZ_PARAM_ERROR) {
+ return Status::InvalidArgument("Failed to init bz2. status
code: {}", bzres);
+ } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres !=
BZ_FINISH_OK &&
+ bzres != BZ_STREAM_END && bzres != BZ_OK) {
+ return Status::InternalError("Failed to init bz2. status code:
{}", bzres);
}
}
size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 |
(size_t)bzstrm.total_out_lo32;
output->resize(total_out);
bzres = BZ2_bzCompressEnd(&bzstrm);
- if (bzres != BZ_OK) {
+ if (bzres == BZ_PARAM_ERROR) {
+ return Status::InvalidArgument("Fail to do deflateEnd on bzip2
stream, res={}", bzres);
+ } else if (bzres != BZ_OK) {
return Status::InternalError("Fail to do deflateEnd on bzip2
stream, res={}", bzres);
}
return Status::OK();
@@ -1102,14 +1136,14 @@ public:
if (ZSTD_isError(ret)) {
compress_failed = true;
- return Status::InvalidArgument("ZSTD_compressStream2
error: {}",
-
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+ return Status::InternalError("ZSTD_compressStream2
error: {}",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
// ret is ZSTD hint for needed output buffer size
if (ret > 0 && out_buf.pos == out_buf.size) {
compress_failed = true;
- return Status::InvalidArgument("ZSTD_compressStream2
output buffer full");
+ return Status::InternalError("ZSTD_compressStream2
output buffer full");
}
finished = last_input ? (ret == 0) : (in_buf.pos ==
inputs[i].size);
@@ -1146,8 +1180,8 @@ public:
input.size);
if (ZSTD_isError(ret)) {
decompress_failed = true;
- return Status::InvalidArgument("ZSTD_decompressDCtx error: {}",
-
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
+ return Status::InternalError("ZSTD_decompressDCtx error: {}",
+
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
// set decompressed size for caller
@@ -1239,8 +1273,12 @@ public:
int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);
- if (zres != Z_OK) {
- return Status::InvalidArgument("Fail to init zlib compress");
+ if (zres == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded(
+ "Fail to init ZLib compress, error={}, res={}",
zError(zres), zres));
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to init ZLib compress,
error={}, res={}",
+ zError(zres), zres);
}
z_strm.next_in = (Bytef*)input.get_data();
@@ -1250,14 +1288,16 @@ public:
zres = deflate(&z_strm, Z_FINISH);
if (zres != Z_OK && zres != Z_STREAM_END) {
- return Status::InvalidArgument("Fail to do ZLib stream compress,
error={}, res={}",
- zError(zres), zres);
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(zres), zres);
}
output->resize(z_strm.total_out);
zres = deflateEnd(&z_strm);
- if (zres != Z_OK) {
+ if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to end zlib compress");
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to end zlib compress");
}
return Status::OK();
}
@@ -1273,10 +1313,14 @@ public:
zstrm.opaque = Z_NULL;
auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
MAX_WBITS + GZIP_CODEC,
8, Z_DEFAULT_STRATEGY);
- if (zres != Z_OK) {
- return Status::InvalidArgument("Fail to do ZLib stream compress,
error={}, res={}",
- zError(zres), zres);
+ if (zres == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded(
+ "Fail to init ZLib stream compress, error={}, res={}",
zError(zres), zres));
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to init ZLib stream compress,
error={}, res={}",
+ zError(zres), zres);
}
+
// we assume that output is e
zstrm.next_out = (Bytef*)output->data();
zstrm.avail_out = output->size();
@@ -1290,16 +1334,19 @@ public:
zres = deflate(&zstrm, flush);
if (zres != Z_OK && zres != Z_STREAM_END) {
- return Status::InvalidArgument("Fail to do ZLib stream
compress, error={}, res={}",
- zError(zres), zres);
+ return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ zError(zres), zres);
}
}
output->resize(zstrm.total_out);
zres = deflateEnd(&zstrm);
- if (zres != Z_OK) {
+ if (zres == Z_DATA_ERROR) {
return Status::InvalidArgument("Fail to do deflateEnd on ZLib
stream, error={}, res={}",
zError(zres), zres);
+ } else if (zres != Z_OK) {
+ return Status::InternalError("Fail to do deflateEnd on ZLib
stream, error={}, res={}",
+ zError(zres), zres);
}
return Status::OK();
}
@@ -1312,7 +1359,7 @@ public:
int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
if (ret != Z_OK) {
- return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
+ return Status::InternalError("Fail to init ZLib decompress,
error={}, res={}",
zError(ret), ret);
}
@@ -1327,6 +1374,13 @@ public:
ret = inflate(&z_strm, Z_FINISH);
if (ret != Z_OK && ret != Z_STREAM_END) {
(void)inflateEnd(&z_strm);
+ if (ret == Z_MEM_ERROR) {
+ throw Exception(Status::MemoryLimitExceeded(
+ "Fail to do ZLib stream compress, error={},
res={}", zError(ret), ret));
+ } else if (ret == Z_DATA_ERROR) {
+ return Status::InvalidArgument(
+ "Fail to do ZLib stream compress, error={},
res={}", zError(ret), ret);
+ }
return Status::InternalError("Fail to do ZLib stream compress,
error={}, res={}",
zError(ret), ret);
}
diff --git a/be/src/vec/functions/function_compress.cpp
b/be/src/vec/functions/function_compress.cpp
index 4c175a5fd44..b645e944bfe 100644
--- a/be/src/vec/functions/function_compress.cpp
+++ b/be/src/vec/functions/function_compress.cpp
@@ -102,7 +102,7 @@ public:
}
// Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress,
making sure st is always Z_OK
- auto st = compression_codec->compress(data, &compressed_str);
+ RETURN_IF_ERROR(compression_codec->compress(data,
&compressed_str));
col_data.resize(col_data.size() + 4 + compressed_str.size());
std::memcpy(col_data.data() + idx, &length, sizeof(length));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]