This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e0fac66223e [branch-2.1](fix) fix snappy decompressor bug (#40862)
e0fac66223e is described below
commit e0fac66223eb9e01dfebd7fef76ec72beb8568f9
Author: Socrates <[email protected]>
AuthorDate: Fri Sep 20 11:57:14 2024 +0800
[branch-2.1](fix) fix snappy decompressor bug (#40862)
## Proposed changes
Hadoop snappycodec source :
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
Example:
OriginData(The original data will be divided into several large data
block.) :
large data block1 | large data block2 | large data block3 | ....
The large data block will be divided into several small data block.
Suppose a large data block is divided into three small blocks:
large data block1: | small block1 | small block2 | small block3 |
CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1)
] [B3 compress(small block1)]>
A : original length of the current block of large data block.
sizeof(A) = 4 bytes.
A = length(small block1) + length(small block2) + length(small block3)
Bx : length of small data block bx.
sizeof(Bx) = 4 bytes.
Bx = length(compress(small blockx))
---
be/src/exec/decompressor.cpp | 162 ++++++++++++---------
.../tvf/compress/test_tvf.csv.snappy | Bin 107203 -> 100481 bytes
.../tvf/test_local_tvf_compression.out | 44 +++---
3 files changed, 114 insertions(+), 92 deletions(-)
diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp
index d8d02c9cf9e..a830eb3cbe7 100644
--- a/be/src/exec/decompressor.cpp
+++ b/be/src/exec/decompressor.cpp
@@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input,
size_t input_len, size_t
}
std::size_t decompressed_large_block_len = 0;
- do {
+ while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
@@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input,
size_t input_len, size_t
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -=
decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;
-
- } while (remaining_decompressed_large_block_len > 0);
+ };
if (*more_input_bytes != 0) {
// Need more input buffer
@@ -535,90 +534,113 @@ Status SnappyBlockDecompressor::init() {
return Status::OK();
}
+// Hadoop snappycodec source :
+//
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
+// Example:
+// OriginData(The original data will be divided into several large data
block.) :
+// large data block1 | large data block2 | large data block3 | ....
+// The large data block will be divided into several small data block.
+// Suppose a large data block is divided into three small blocks:
+// large data block1: | small block1 | small block2 | small block3 |
+// CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1)
] [B3 compress(small block1)]>
+//
+// A : original length of the current block of large data block.
+// sizeof(A) = 4 bytes.
+// A = length(small block1) + length(small block2) + length(small block3)
+// Bx : length of small data block bx.
+// sizeof(Bx) = 4 bytes.
+// Bx = length(compress(small blockx))
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
size_t* input_bytes_read, uint8_t*
output,
size_t output_max_len, size_t*
decompressed_len,
bool* stream_end, size_t*
more_input_bytes,
size_t* more_output_bytes) {
- uint8_t* src = input;
- size_t remaining_input_size = input_len;
- int64_t uncompressed_total_len = 0;
- *input_bytes_read = 0;
+ auto* input_ptr = input;
+ auto* output_ptr = output;
- // The hadoop snappy codec is as:
- // <4 byte big endian uncompressed size>
- // <4 byte big endian compressed size>
- // <snappy compressed block>
- // ....
- // <4 byte big endian uncompressed size>
- // <4 byte big endian compressed size>
- // <snappy compressed block>
- //
- // See:
- //
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
- while (remaining_input_size > 0) {
- if (remaining_input_size < 4) {
- *more_input_bytes = 4 - remaining_input_size;
- break;
+ while (input_len > 0) {
+ //if faild , fall back to large block begin
+ auto* large_block_input_ptr = input_ptr;
+ auto* large_block_output_ptr = output_ptr;
+
+ if (input_len < sizeof(uint32_t)) {
+ return Status::InvalidArgument(strings::Substitute(
+ "fail to do hadoop-snappy decompress, input_len=$0",
input_len));
}
- // Read uncompressed size
- uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
- int64_t remaining_output_len = output_max_len - uncompressed_total_len;
- if (remaining_output_len < uncompressed_block_len) {
+
+ uint32_t remaining_decompressed_large_block_len =
BigEndian::Load32(input_ptr);
+
+ input_ptr += sizeof(uint32_t);
+ input_len -= sizeof(uint32_t);
+
+ std::size_t remaining_output_len = output_max_len - *decompressed_len;
+
+ if (remaining_output_len < remaining_decompressed_large_block_len) {
// Need more output buffer
- *more_output_bytes = uncompressed_block_len - remaining_output_len;
- break;
- }
+ *more_output_bytes = remaining_decompressed_large_block_len -
remaining_output_len;
+ input_ptr = large_block_input_ptr;
+ output_ptr = large_block_output_ptr;
- if (uncompressed_block_len == 0) {
- remaining_input_size -= sizeof(uint32_t);
break;
}
- if (remaining_input_size <= 2 * sizeof(uint32_t)) {
- // The remaining input size should be larger then <uncompressed
size><compressed size><compressed data>
- // +1 means we need at least 1 bytes of compressed data.
- *more_input_bytes = 2 * sizeof(uint32_t) + 1 -
remaining_input_size;
- break;
- }
+ std::size_t decompressed_large_block_len = 0;
+ while (remaining_decompressed_large_block_len > 0) {
+ // Check that input length should not be negative.
+ if (input_len < sizeof(uint32_t)) {
+ *more_input_bytes = sizeof(uint32_t) - input_len;
+ break;
+ }
- // Read compressed size
- size_t tmp_remaining_size = remaining_input_size - 2 *
sizeof(uint32_t);
- size_t compressed_len = _read_int32(src + sizeof(uint32_t));
- if (compressed_len > tmp_remaining_size) {
- // Need more input data
- *more_input_bytes = compressed_len - tmp_remaining_size;
- break;
- }
+ // Read the length of the next snappy compressed block.
+ size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
- src += 2 * sizeof(uint32_t);
- remaining_input_size -= 2 * sizeof(uint32_t);
-
- // ATTN: the uncompressed len from GetUncompressedLength() is same as
- // uncompressed_block_len, so I think it is unnecessary to get it
again.
- // Get uncompressed len from snappy
- // size_t uncompressed_len;
- // if (!snappy::GetUncompressedLength(reinterpret_cast<const
char*>(src),
- // compressed_len, &uncompressed_len)) {
- // return Status::InternalError("snappy block decompress failed to
get uncompressed len");
- // }
-
- // Decompress
- if (!snappy::RawUncompress(reinterpret_cast<const char*>(src),
compressed_len,
- reinterpret_cast<char*>(output))) {
- return Status::InternalError(
- "snappy block decompress failed. uncompressed_len: {},
compressed_len: {}",
- uncompressed_block_len, compressed_len);
+ input_ptr += sizeof(uint32_t);
+ input_len -= sizeof(uint32_t);
+
+ if (compressed_small_block_len == 0) {
+ continue;
+ }
+
+ if (compressed_small_block_len > input_len) {
+ // Need more input buffer
+ *more_input_bytes = compressed_small_block_len - input_len;
+ break;
+ }
+
+ // Decompress this block.
+ size_t decompressed_small_block_len;
+ if (!snappy::GetUncompressedLength(reinterpret_cast<const
char*>(input_ptr),
+ compressed_small_block_len,
+ &decompressed_small_block_len))
{
+ return Status::InternalError(
+ "snappy block decompress failed to get uncompressed
len");
+ }
+ if (!snappy::RawUncompress(reinterpret_cast<const
char*>(input_ptr),
+ compressed_small_block_len,
+ reinterpret_cast<char*>(output_ptr))) {
+ return Status::InternalError(
+ "snappy block decompress failed. uncompressed_len: {},
compressed_len: {}",
+ decompressed_small_block_len,
compressed_small_block_len);
+ }
+ input_ptr += compressed_small_block_len;
+ input_len -= compressed_small_block_len;
+
+ output_ptr += decompressed_small_block_len;
+ remaining_decompressed_large_block_len -=
decompressed_small_block_len;
+ decompressed_large_block_len += decompressed_small_block_len;
+ };
+
+ if (*more_input_bytes != 0) {
+ // Need more input buffer
+ input_ptr = large_block_input_ptr;
+ output_ptr = large_block_output_ptr;
+ break;
}
- output += uncompressed_block_len;
- src += compressed_len;
- remaining_input_size -= compressed_len;
- uncompressed_total_len += uncompressed_block_len;
+ *decompressed_len += decompressed_large_block_len;
}
-
- *input_bytes_read += (input_len - remaining_input_size);
- *decompressed_len = uncompressed_total_len;
+ *input_bytes_read += (input_ptr - input);
// If no more input and output need, means this is the end of a compressed
block
*stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
diff --git
a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy
index 9ac2b7ae299..054613c5146 100644
Binary files
a/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy and
b/regression-test/data/external_table_p0/tvf/compress/test_tvf.csv.snappy differ
diff --git
a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
index 8120427ea6c..5f1a4f5d463 100644
--- a/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
+++ b/regression-test/data/external_table_p0/tvf/test_local_tvf_compression.out
@@ -123,28 +123,28 @@
2023-09-18 7
-- !snappy_1 --
-1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09
4.899588807225554
-10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.137732740231178
-100 813423 zICskqgcdPc 2024-03-23 8.486529018746493
-1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05
7.8741752707933435
-1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.758244908785949
-1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.3934945460194128
-1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.377110182643222
-1004 227858 JIFyjKzmbjkt 2024-03-24 5.748037621519263
-1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08
4.122635188836725
-1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.482290064407216
-1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.144449761594585
-1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25
1.5683842369495904
-1009 757881 AjcSyYMIMzS 2024-05-04 7.5674012939461255
-101 326164 QWLnalYNmYDt 2024-01-07 3.8159876011523854
-1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04
3.8087069699523313
-1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17
6.773606843056635
-1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.220639250504097
-1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15
8.305048700108081
-1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668867233009998
-1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17
3.8287482122289007
-1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717891874157961
-1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07
4.733337480058437
+1 694832 buHDwfGeNHfpRFdNaogneddi 2024-02-09 4.8995886
+10 218729 goZsLvvWFOIjlzSAitC 2024-06-10 4.1377325
+100 813423 zICskqgcdPc 2024-03-23 8.4865294
+1000 612650 RzOXeYpKOmuJOogUyeIEDNDmvq 2023-12-05 7.8741751
+1001 29486 WoUAFJFuJNnwyqMnoDhX 2024-03-11 9.7582445
+1002 445363 OdTEeeWtxfcRwx 2024-08-01 0.39349455
+1003 707035 JAYnKxusVpGzYueACf 2023-11-14 5.37711
+1004 227858 JIFyjKzmbjkt 2024-03-24 5.7480378
+1005 539305 PlruLkSUSXZgaHafFriklrhCi 2023-11-08 4.1226354
+1006 145518 KCwqEcSCGuXrHerwn 2024-06-22 8.48229
+1007 939028 KzXhEMelsKVLbDMsEKh 2024-01-01 8.14445
+1008 913569 CHlqPKqkIdqwBCBUHreXbFAkCt 2024-05-25 1.5683843
+1009 757881 AjcSyYMIMzS 2024-05-04 7.5674014
+101 326164 QWLnalYNmYDt 2024-01-07 3.8159876
+1010 427079 AlRUfmxfAuoLnPqUTvQVMtrS 2024-06-04 3.808707
+1011 252076 gHmFDhtytYzWETIxdpkpMUpnLd 2023-09-17 6.7736068
+1012 819615 rFfRHquexplDJvSeUK 2023-11-02 3.2206392
+1013 413456 uvNPelHXYjJKiOkwdNbmUkGzxiiqLo 2024-03-15 8.3050489
+1014 308042 vnzcsvHxnWFhvLwJkAtUqe 2024-06-15 1.5668868
+1015 603837 VBEsRVGyhRNWQeKzDaBnJHmFDnXAOU 2024-08-17 3.8287482
+1016 912679 eEjldPhxojSjTnE 2024-01-09 1.3717892
+1017 630392 TcczYHXbwaCYzFSfXJlhsFjN 2023-10-07 4.7333374
-- !snappy_2 --
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]