This is an automated email from the ASF dual-hosted git repository.
mgreber pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.18.x by this push:
new 4d20ea936 KUDU-3647 more robust zlib wrapper code
4d20ea936 is described below
commit 4d20ea93600acdf64617ed648f274d236369ed44
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Feb 27 22:33:55 2025 -0800
KUDU-3647 more robust zlib wrapper code
This patch introduces much more robust error handling for both
zlib::{CompressLevel,Uncompress}() utility wrappers and fixes a few bugs
introduced along with other modifications in [1]. In addition, the code
now follows the documented API of zlib's inflate()/deflate() more
closely per docs [2] and example [3], while zlib::CompressLevel()
no longer allocates memory on the heap and uses a much smaller
stack-allocated buffer. It seems that using a larger heap-allocated
buffer inadvertently hid the improper usage of zlib's deflate() API.
This patch adds a few test scenarios to cover the main functionality
of zlib::CompressLevel() and zlib::Uncompress(), and also a few specific
conditions attributed to truncated and corrupted input data that's fed
to zlib::Uncompress().
[1] https://issues.apache.org/jira/browse/KUDU-3201
[2] https://zlib.net/zlib_how.html
[3] https://zlib.net/zpipe.c
Change-Id: I5babcaf0a07c284d44f16201aa765e277b6464d3
Reviewed-on: http://gerrit.cloudera.org:8080/22568
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
(cherry picked from commit 1733cfc5adaa27e361e2f2df25a8d8c12d8c8710)
Reviewed-on: http://gerrit.cloudera.org:8080/22608
Reviewed-by: Marton Greber <[email protected]>
---
src/kudu/util/CMakeLists.txt | 9 +++
src/kudu/util/zlib-test.cc | 157 +++++++++++++++++++++++++++++++++++++++++++
src/kudu/util/zlib.cc | 97 +++++++++++++++++---------
src/kudu/util/zlib.h | 10 +--
4 files changed, 236 insertions(+), 37 deletions(-)
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 501ac5ec1..0bebe9453 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -707,3 +707,12 @@ if(NOT NO_TESTS)
target_link_libraries(instance_detector-test
kudu_cloud_util)
endif()
+
+#######################################
+# zlib-test
+#######################################
+ADD_KUDU_TEST(zlib-test)
+if(NOT NO_TESTS)
+ target_link_libraries(zlib-test
+ zlib)
+endif()
diff --git a/src/kudu/util/zlib-test.cc b/src/kudu/util/zlib-test.cc
new file mode 100644
index 000000000..9347d542a
--- /dev/null
+++ b/src/kudu/util/zlib-test.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/zlib.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <zconf.h>
+#include <zlib.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::ostringstream;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace zlib {
+
+class ZlibTest : public KuduTest {
+ public:
+ ZlibTest()
+ : rand_(SeedRandom()) {
+ }
+
+ void GenerateRandomData(uint8_t* out, size_t size) {
+ uint64_t* p64 = reinterpret_cast<uint64_t*>(out);
+ for (size_t i = 0; i < size / 8; ++i) {
+ *p64++ = rand_.Next64();
+ }
+ const uint8_t* p8_end = out + size;
+ uint8_t* p8 = reinterpret_cast<uint8_t*>(p64);
+ if (p8 < p8_end) {
+ const uint64_t val = rand_.Next64();
+ uint8_t bit_shift = 0;
+ do {
+ *p8++ = static_cast<uint8_t>(val >> bit_shift);
+ bit_shift += 8;
+ } while (p8 < p8_end);
+ }
+ }
+
+protected:
+ Random rand_;
+};
+
+TEST_F(ZlibTest, CompressUncompress) {
+ // Set this to be a multiple of the maximum possible LZ77 window size.
+ constexpr const size_t kMaxSize = 64 * 1024;
+ static_assert((kMaxSize / (1 << MAX_WBITS)) >= 2);
+ static_assert(Z_NO_COMPRESSION <= Z_BEST_COMPRESSION);
+
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]);
+ for (size_t size = 8; size <= kMaxSize;) {
+ GenerateRandomData(chunk.get(), size);
+ for (int level = Z_NO_COMPRESSION; level <= Z_BEST_COMPRESSION; ++level) {
+ SCOPED_TRACE(Substitute("compression level $0 size $1", level, size));
+ Slice data_p(chunk.get(), size);
+ ostringstream oss_c;
+ ASSERT_OK(CompressLevel(data_p, level, &oss_c));
+ const string& data_str_c(oss_c.str());
+ Slice data_c(data_str_c.data(), data_str_c.size());
+ ostringstream oss_p;
+ ASSERT_OK(Uncompress(data_c, &oss_p));
+ const string& data_str_p(oss_p.str());
+ ASSERT_EQ(size, data_str_p.size());
+ ASSERT_EQ(0, memcmp(data_str_p.c_str(), chunk.get(), size));
+ }
+ size += (rand_.Uniform(1024) + 1);
+ }
+}
+
+TEST_F(ZlibTest, UncompressCorruptedData) {
+ constexpr const size_t kMaxSize = 64 * 1024;
+ constexpr const int kLevel = Z_BEST_SPEED;
+
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]);
+ for (size_t size = 256; size <= kMaxSize;) {
+ SCOPED_TRACE(Substitute("size $0", size));
+ GenerateRandomData(chunk.get(), size);
+ Slice data_p(chunk.get(), size);
+ ostringstream oss_c;
+ ASSERT_OK(CompressLevel(data_p, kLevel, &oss_c));
+ const string& data_str_c(oss_c.str());
+
+ for (size_t bytes_num = 1; bytes_num < 32; ++bytes_num) {
+ Slice data_c(data_str_c.data(), data_str_c.size() - bytes_num);
+ ostringstream oss_p;
+ const auto& s = Uncompress(data_c, &oss_p);
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "truncated gzip data");
+ }
+
+ {
+ // Reverse the sequence of bytes in the input data.
+ string data_str_c_reversed;
+ data_str_c_reversed.reserve(data_str_c.size());
+ std::reverse_copy(data_str_c.begin(),
+ data_str_c.end(),
+ std::back_inserter(data_str_c_reversed));
+ Slice data_c(data_str_c_reversed.data(), data_str_c_reversed.size());
+ ostringstream oss_p;
+ const auto& s = Uncompress(data_c, &oss_p);
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR");
+ }
+
+ {
+ // Replace the second half of the data with a bogus ASCII symbol.
+ string data_str_cc = data_str_c;
+ auto it = data_str_cc.begin();
+ std::advance(it, size / 2);
+ std::fill(it, data_str_cc.end(), 'x');
+ Slice data_c(data_str_cc.data(), data_str_cc.size());
+ ostringstream oss_p;
+ const auto& s = Uncompress(data_c, &oss_p);
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR");
+ }
+ size += (rand_.Uniform(1024) + 1);
+ }
+}
+
+} // namespace zlib
+} // namespace kudu
diff --git a/src/kudu/util/zlib.cc b/src/kudu/util/zlib.cc
index 4bda79ad9..4d70c29a1 100644
--- a/src/kudu/util/zlib.cc
+++ b/src/kudu/util/zlib.cc
@@ -19,20 +19,21 @@
#include <cstdint>
#include <cstring>
-#include <memory>
#include <string>
+#include <glog/logging.h>
#include <zconf.h>
#include <zlib.h>
-#include "kudu/gutil/macros.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
+using std::ios;
using std::ostream;
using std::string;
-using std::unique_ptr;
#define ZRETURN_NOT_OK(call) \
RETURN_NOT_OK(ZlibResultToStatus(call))
@@ -74,6 +75,12 @@ Status Compress(Slice input, ostream* out) {
// See https://zlib.net/zlib_how.html for context on using zlib.
Status CompressLevel(Slice input, int level, ostream* out) {
+ DCHECK(out);
+ // Output stream exceptions aren't handled in this function, so the stream
+ // isn't supposed to throw on errors, but just set bad/failure bits instead.
+ DCHECK_EQ(ios::goodbit, out->exceptions());
+ DCHECK(level >= Z_DEFAULT_COMPRESSION && level <= Z_BEST_COMPRESSION);
+
z_stream zs;
memset(&zs, 0, sizeof(zs));
ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
@@ -83,55 +90,79 @@ Status CompressLevel(Slice input, int level, ostream* out) {
zs.avail_in = input.size();
zs.next_in = const_cast<uint8_t*>(input.data());
- const int kChunkSize = 256 * 1024;
- unique_ptr<unsigned char[]> chunk(new unsigned char[kChunkSize]);
- int flush;
+ constexpr const size_t kChunkSize = 16 * 1024;
+ unsigned char buf[kChunkSize];
+ int rc;
do {
zs.avail_out = kChunkSize;
- zs.next_out = chunk.get();
- flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
- Status s = ZlibResultToStatus(deflate(&zs, flush));
- if (!s.ok() && !s.IsEndOfFile()) {
- return s;
+ zs.next_out = buf;
+ const int flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
+ rc = deflate(&zs, flush);
+ if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) {
+ ignore_result(deflateEnd(&zs));
+ return ZlibResultToStatus(rc);
}
- int out_size = zs.next_out - chunk.get();
- if (out_size > 0) {
- out->write(reinterpret_cast<char *>(chunk.get()), out_size);
+ DCHECK_GT(zs.next_out - buf, 0);
+ out->write(reinterpret_cast<char*>(buf), zs.next_out - buf);
+ if (PREDICT_FALSE(out->fail())) {
+ ignore_result(deflateEnd(&zs));
+ return Status::IOError("error writing to output stream");
}
- } while (flush != Z_FINISH);
- ZRETURN_NOT_OK(deflateEnd(&zs));
- return Status::OK();
+ } while (rc != Z_STREAM_END);
+
+ return ZlibResultToStatus(deflateEnd(&zs));
}
// See https://zlib.net/zlib_how.html for context on using zlib.
-Status Uncompress(Slice compressed, std::ostream* out) {
+Status Uncompress(Slice input, ostream* out) {
+ DCHECK(out);
+ // Output stream exceptions aren't handled in this function, so the stream
+ // isn't supposed to throw on errors, but just set bad/failure bits instead.
+ DCHECK_EQ(ios::goodbit, out->exceptions());
+
// Initialize the z_stream at the start of the data with the
// data size as the available input.
z_stream zs;
memset(&zs, 0, sizeof(zs));
- zs.next_in = const_cast<uint8_t*>(compressed.data());
- zs.avail_in = compressed.size();
+ zs.next_in = const_cast<uint8_t*>(input.data());
+ zs.avail_in = input.size();
// Initialize inflation with the windowBits set to be GZIP compatible.
// The documentation (https://www.zlib.net/manual.html#Advanced) describes
that
// Adding 16 configures inflate to decode the gzip format.
ZRETURN_NOT_OK(inflateInit2(&zs, MAX_WBITS + 16 /* enable gzip */));
- // Continue calling inflate, decompressing data into the buffer in
`zs.next_out` and writing
- // the buffer content to `out`, until an error is received or there is no
more data
- // to decompress.
- Status s;
+ // Continue calling inflate, decompressing data into the buffer in
+ // `zs.next_out` and writing the buffer content to `out`, until an error
+ // condition is encountered or there is no more data to decompress.
+ constexpr const size_t kChunkSize = 16 * 1024;
+ unsigned char buf[kChunkSize];
+ int rc;
do {
- unsigned char buf[4096];
zs.next_out = buf;
- zs.avail_out = arraysize(buf);
- s = ZlibResultToStatus(inflate(&zs, Z_NO_FLUSH));
- if (!s.ok() && !s.IsEndOfFile()) {
- return s;
+ zs.avail_out = kChunkSize;
+ rc = inflate(&zs, Z_NO_FLUSH);
+ if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) {
+ ignore_result(inflateEnd(&zs));
+ // Special handling for Z_BUF_ERROR in certain conditions to produce
+ // Status::Corruption() instead of Status::RuntimeError().
+ if (rc == Z_BUF_ERROR && zs.avail_in == 0) {
+ // This means the input was most likely incomplete/truncated. Because
+ // the input isn't a stream in this function, all the input data is
+ // available at once as Slice, so no more available bytes on input
+ // are ever expected at this point.
+ return Status::Corruption("truncated gzip data");
+ }
+ return ZlibResultToStatus(rc);
}
- out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
- } while (zs.avail_out == 0);
+ DCHECK_GT(zs.next_out - buf, 0);
+ out->write(reinterpret_cast<char*>(buf), zs.next_out - buf);
+ if (PREDICT_FALSE(out->fail())) {
+ ignore_result(inflateEnd(&zs));
+ return Status::IOError("error writing to output stream");
+ }
+ } while (rc != Z_STREAM_END);
+
// If we haven't returned early with a bad status, finalize inflation.
- ZRETURN_NOT_OK(inflateEnd(&zs));
- return Status::OK();
+ return ZlibResultToStatus(inflateEnd(&zs));
}
} // namespace zlib
diff --git a/src/kudu/util/zlib.h b/src/kudu/util/zlib.h
index 478382b7b..0931d557f 100644
--- a/src/kudu/util/zlib.h
+++ b/src/kudu/util/zlib.h
@@ -26,18 +26,20 @@ namespace zlib {
// Zlib-compress the data in 'input', appending the result to 'out'.
//
-// In case of an error, some data may still be appended to 'out'.
+// In case of an error, non-OK status is returned and some data may still
+// be appended to 'out'.
Status Compress(Slice input, std::ostream* out);
// The same as the above, but with a custom level (1-9, where 1 is fastest
// and 9 is best compression).
Status CompressLevel(Slice input, int level, std::ostream* out);
-// Uncompress the zlib-compressed data in 'compressed', appending the result
+// Uncompress the zlib-compressed data in 'input', appending the result
// to 'out'.
//
-// In case of an error, some data may still be appended to 'out'.
-Status Uncompress(Slice compressed, std::ostream* out);
+// In case of an error, non-OK status is returned and some data may still
+// be appended to 'out'.
+Status Uncompress(Slice input, std::ostream* out);
} // namespace zlib
} // namespace kudu