This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1733cfc5adaa27e361e2f2df25a8d8c12d8c8710 Author: Alexey Serbin <[email protected]> AuthorDate: Thu Feb 27 22:33:55 2025 -0800 KUDU-3647 more robust zlib wrapper code This patch introduces much more robust error handling for both zlib::{CompressLevel,Uncompress}() utility wrappers and fixes a few bugs introduced along with other modifications in [1]. In addition, the code now follows the documented API of zlib's inflate()/deflate() more closely per docs [2] and example [3], while zlib::CompressLevel() no longer allocates memory on the heap and uses a much smaller stack-allocated buffer. It seems that using a larger heap-allocated buffer inadvertently hid the improper usage of zlib's deflate() API. This patch adds a few test scenarios to cover the main functionality of zlib::CompressLevel() and zlib::Uncompress(), and also a few specific conditions attributed to truncated and corrupted input data that's fed to zlib::Uncompress(). [1] https://issues.apache.org/jira/browse/KUDU-3201 [2] https://zlib.net/zlib_how.html [3] https://zlib.net/zpipe.c Change-Id: I5babcaf0a07c284d44f16201aa765e277b6464d3 Reviewed-on: http://gerrit.cloudera.org:8080/22568 Reviewed-by: Abhishek Chennaka <[email protected]> Tested-by: Alexey Serbin <[email protected]> --- src/kudu/util/CMakeLists.txt | 9 +++ src/kudu/util/zlib-test.cc | 157 +++++++++++++++++++++++++++++++++++++++++++ src/kudu/util/zlib.cc | 97 +++++++++++++++++--------- src/kudu/util/zlib.h | 10 +-- 4 files changed, 236 insertions(+), 37 deletions(-) diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index 501ac5ec1..0bebe9453 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -707,3 +707,12 @@ if(NOT NO_TESTS) target_link_libraries(instance_detector-test kudu_cloud_util) endif() + +####################################### +# zlib-test +####################################### +ADD_KUDU_TEST(zlib-test) +if(NOT NO_TESTS) + target_link_libraries(zlib-test + zlib) +endif() diff --git a/src/kudu/util/zlib-test.cc b/src/kudu/util/zlib-test.cc new file mode 100644 index 000000000..9347d542a --- /dev/null +++ b/src/kudu/util/zlib-test.cc @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/zlib.h" + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <iterator> +#include <memory> +#include <ostream> +#include <string> + +#include <gtest/gtest.h> +#include <zconf.h> +#include <zlib.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/random.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +using std::ostringstream; +using std::string; +using std::unique_ptr; +using strings::Substitute; + +namespace kudu { +namespace zlib { + +class ZlibTest : public KuduTest { + public: + ZlibTest() + : rand_(SeedRandom()) { + } + + void GenerateRandomData(uint8_t* out, size_t size) { + uint64_t* p64 = reinterpret_cast<uint64_t*>(out); + for (size_t i = 0; i < size / 8; ++i) { + *p64++ = rand_.Next64(); + } + const uint8_t* p8_end = out + size; + uint8_t* p8 = reinterpret_cast<uint8_t*>(p64); + if (p8 < p8_end) { + const uint64_t val = rand_.Next64(); + uint8_t bit_shift = 0; + do { + *p8++ = static_cast<uint8_t>(val >> bit_shift); + bit_shift += 8; + } while (p8 < p8_end); + } + } + +protected: + Random rand_; +}; + +TEST_F(ZlibTest, CompressUncompress) { + // Set this to be a multiple of the maximum possible LZ77 window size. + constexpr const size_t kMaxSize = 64 * 1024; + static_assert((kMaxSize / (1 << MAX_WBITS)) >= 2); + static_assert(Z_NO_COMPRESSION <= Z_BEST_COMPRESSION); + + SKIP_IF_SLOW_NOT_ALLOWED(); + + unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]); + for (size_t size = 8; size <= kMaxSize;) { + GenerateRandomData(chunk.get(), size); + for (int level = Z_NO_COMPRESSION; level <= Z_BEST_COMPRESSION; ++level) { + SCOPED_TRACE(Substitute("compression level $0 size $1", level, size)); + Slice data_p(chunk.get(), size); + ostringstream oss_c; + ASSERT_OK(CompressLevel(data_p, level, &oss_c)); + const string& data_str_c(oss_c.str()); + Slice data_c(data_str_c.data(), data_str_c.size()); + ostringstream oss_p; + ASSERT_OK(Uncompress(data_c, &oss_p)); + const string& data_str_p(oss_p.str()); + ASSERT_EQ(size, data_str_p.size()); + ASSERT_EQ(0, memcmp(data_str_p.c_str(), chunk.get(), size)); + } + size += (rand_.Uniform(1024) + 1); + } +} + +TEST_F(ZlibTest, UncompressCorruptedData) { + constexpr const size_t kMaxSize = 64 * 1024; + constexpr const int kLevel = Z_BEST_SPEED; + + SKIP_IF_SLOW_NOT_ALLOWED(); + + unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]); + for (size_t size = 256; size <= kMaxSize;) { + SCOPED_TRACE(Substitute("size $0", size)); + GenerateRandomData(chunk.get(), size); + Slice data_p(chunk.get(), size); + ostringstream oss_c; + ASSERT_OK(CompressLevel(data_p, kLevel, &oss_c)); + const string& data_str_c(oss_c.str()); + + for (size_t bytes_num = 1; bytes_num < 32; ++bytes_num) { + Slice data_c(data_str_c.data(), data_str_c.size() - bytes_num); + ostringstream oss_p; + const auto& s = Uncompress(data_c, &oss_p); + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "truncated gzip data"); + } + + { + // Reverse the sequence of bytes in the input data. + string data_str_c_reversed; + data_str_c_reversed.reserve(data_str_c.size()); + std::reverse_copy(data_str_c.begin(), + data_str_c.end(), + std::back_inserter(data_str_c_reversed)); + Slice data_c(data_str_c_reversed.data(), data_str_c_reversed.size()); + ostringstream oss_p; + const auto& s = Uncompress(data_c, &oss_p); + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR"); + } + + { + // Replace the second half of the data with a bogus ASCII symbol. + string data_str_cc = data_str_c; + auto it = data_str_cc.begin(); + std::advance(it, size / 2); + std::fill(it, data_str_cc.end(), 'x'); + Slice data_c(data_str_cc.data(), data_str_cc.size()); + ostringstream oss_p; + const auto& s = Uncompress(data_c, &oss_p); + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR"); + } + size += (rand_.Uniform(1024) + 1); + } +} + +} // namespace zlib +} // namespace kudu diff --git a/src/kudu/util/zlib.cc b/src/kudu/util/zlib.cc index 4bda79ad9..4d70c29a1 100644 --- a/src/kudu/util/zlib.cc +++ b/src/kudu/util/zlib.cc @@ -19,20 +19,21 @@ #include <cstdint> #include <cstring> -#include <memory> #include <string> +#include <glog/logging.h> #include <zconf.h> #include <zlib.h> -#include "kudu/gutil/macros.h" +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" +using std::ios; using std::ostream; using std::string; -using std::unique_ptr; #define ZRETURN_NOT_OK(call) \ RETURN_NOT_OK(ZlibResultToStatus(call)) @@ -74,6 +75,12 @@ Status Compress(Slice input, ostream* out) { // See https://zlib.net/zlib_how.html for context on using zlib. Status CompressLevel(Slice input, int level, ostream* out) { + DCHECK(out); + // Output stream exceptions aren't handled in this function, so the stream + // isn't supposed to throw on errors, but just set bad/failure bits instead. + DCHECK_EQ(ios::goodbit, out->exceptions()); + DCHECK(level >= Z_DEFAULT_COMPRESSION && level <= Z_BEST_COMPRESSION); + z_stream zs; memset(&zs, 0, sizeof(zs)); ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED, @@ -83,55 +90,79 @@ Status CompressLevel(Slice input, int level, ostream* out) { zs.avail_in = input.size(); zs.next_in = const_cast<uint8_t*>(input.data()); - const int kChunkSize = 256 * 1024; - unique_ptr<unsigned char[]> chunk(new unsigned char[kChunkSize]); - int flush; + constexpr const size_t kChunkSize = 16 * 1024; + unsigned char buf[kChunkSize]; + int rc; do { zs.avail_out = kChunkSize; - zs.next_out = chunk.get(); - flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH; - Status s = ZlibResultToStatus(deflate(&zs, flush)); - if (!s.ok() && !s.IsEndOfFile()) { - return s; + zs.next_out = buf; + const int flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH; + rc = deflate(&zs, flush); + if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) { + ignore_result(deflateEnd(&zs)); + return ZlibResultToStatus(rc); } - int out_size = zs.next_out - chunk.get(); - if (out_size > 0) { - out->write(reinterpret_cast<char *>(chunk.get()), out_size); + DCHECK_GT(zs.next_out - buf, 0); + out->write(reinterpret_cast<char*>(buf), zs.next_out - buf); + if (PREDICT_FALSE(out->fail())) { + ignore_result(deflateEnd(&zs)); + return Status::IOError("error writing to output stream"); } - } while (flush != Z_FINISH); - ZRETURN_NOT_OK(deflateEnd(&zs)); - return Status::OK(); + } while (rc != Z_STREAM_END); + + return ZlibResultToStatus(deflateEnd(&zs)); } // See https://zlib.net/zlib_how.html for context on using zlib. -Status Uncompress(Slice compressed, std::ostream* out) { +Status Uncompress(Slice input, ostream* out) { + DCHECK(out); + // Output stream exceptions aren't handled in this function, so the stream + // isn't supposed to throw on errors, but just set bad/failure bits instead. + DCHECK_EQ(ios::goodbit, out->exceptions()); + // Initialize the z_stream at the start of the data with the // data size as the available input. z_stream zs; memset(&zs, 0, sizeof(zs)); - zs.next_in = const_cast<uint8_t*>(compressed.data()); - zs.avail_in = compressed.size(); + zs.next_in = const_cast<uint8_t*>(input.data()); + zs.avail_in = input.size(); // Initialize inflation with the windowBits set to be GZIP compatible. // The documentation (https://www.zlib.net/manual.html#Advanced) describes that // Adding 16 configures inflate to decode the gzip format. ZRETURN_NOT_OK(inflateInit2(&zs, MAX_WBITS + 16 /* enable gzip */)); - // Continue calling inflate, decompressing data into the buffer in `zs.next_out` and writing - // the buffer content to `out`, until an error is received or there is no more data - // to decompress. - Status s; + // Continue calling inflate, decompressing data into the buffer in + // `zs.next_out` and writing the buffer content to `out`, until an error + // condition is encountered or there is no more data to decompress. + constexpr const size_t kChunkSize = 16 * 1024; + unsigned char buf[kChunkSize]; + int rc; do { - unsigned char buf[4096]; zs.next_out = buf; - zs.avail_out = arraysize(buf); - s = ZlibResultToStatus(inflate(&zs, Z_NO_FLUSH)); - if (!s.ok() && !s.IsEndOfFile()) { - return s; + zs.avail_out = kChunkSize; + rc = inflate(&zs, Z_NO_FLUSH); + if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) { + ignore_result(inflateEnd(&zs)); + // Special handling for Z_BUF_ERROR in certain conditions to produce + // Status::Corruption() instead of Status::RuntimeError(). + if (rc == Z_BUF_ERROR && zs.avail_in == 0) { + // This means the input was most likely incomplete/truncated. Because + // the input isn't a stream in this function, all the input data is + // available at once as Slice, so no more available bytes on input + // are ever expected at this point. + return Status::Corruption("truncated gzip data"); + } + return ZlibResultToStatus(rc); } - out->write(reinterpret_cast<char *>(buf), zs.next_out - buf); - } while (zs.avail_out == 0); + DCHECK_GT(zs.next_out - buf, 0); + out->write(reinterpret_cast<char*>(buf), zs.next_out - buf); + if (PREDICT_FALSE(out->fail())) { + ignore_result(inflateEnd(&zs)); + return Status::IOError("error writing to output stream"); + } + } while (rc != Z_STREAM_END); + // If we haven't returned early with a bad status, finalize inflation. - ZRETURN_NOT_OK(inflateEnd(&zs)); - return Status::OK(); + return ZlibResultToStatus(inflateEnd(&zs)); } } // namespace zlib diff --git a/src/kudu/util/zlib.h b/src/kudu/util/zlib.h index 478382b7b..0931d557f 100644 --- a/src/kudu/util/zlib.h +++ b/src/kudu/util/zlib.h @@ -26,18 +26,20 @@ namespace zlib { // Zlib-compress the data in 'input', appending the result to 'out'. // -// In case of an error, some data may still be appended to 'out'. +// In case of an error, non-OK status is returned and some data may still +// be appended to 'out'. Status Compress(Slice input, std::ostream* out); // The same as the above, but with a custom level (1-9, where 1 is fastest // and 9 is best compression). Status CompressLevel(Slice input, int level, std::ostream* out); -// Uncompress the zlib-compressed data in 'compressed', appending the result +// Uncompress the zlib-compressed data in 'input', appending the result // to 'out'. // -// In case of an error, some data may still be appended to 'out'. -Status Uncompress(Slice compressed, std::ostream* out); +// In case of an error, non-OK status is returned and some data may still +// be appended to 'out'. +Status Uncompress(Slice input, std::ostream* out); } // namespace zlib } // namespace kudu
