This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.18.x by this push:
     new 4d20ea936 KUDU-3647 more robust zlib wrapper code
4d20ea936 is described below

commit 4d20ea93600acdf64617ed648f274d236369ed44
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Feb 27 22:33:55 2025 -0800

    KUDU-3647 more robust zlib wrapper code
    
    This patch introduces much more robust error handling for both
    zlib::{CompressLevel,Uncompress}() utility wrappers and fixes a few bugs
    introduced along with other modifications in [1].  In addition, the code
    now follows the documented API of zlib's inflate()/deflate() more
    closely per docs [2] and example [3], while zlib::CompressLevel()
    no longer allocates memory on the heap and uses a much smaller
    stack-allocated buffer.  It seems that using a larger heap-allocated
    buffer inadvertently hid the improper usage of zlib's deflate() API.
    
    This patch adds a few test scenarios to cover the main functionality
    of zlib::CompressLevel() and zlib::Uncompress(), and also a few specific
    conditions attributed to truncated and corrupted input data that's fed
    to zlib::Uncompress().
    
    [1] https://issues.apache.org/jira/browse/KUDU-3201
    [2] https://zlib.net/zlib_how.html
    [3] https://zlib.net/zpipe.c
    
    Change-Id: I5babcaf0a07c284d44f16201aa765e277b6464d3
    Reviewed-on: http://gerrit.cloudera.org:8080/22568
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    (cherry picked from commit 1733cfc5adaa27e361e2f2df25a8d8c12d8c8710)
    Reviewed-on: http://gerrit.cloudera.org:8080/22608
    Reviewed-by: Marton Greber <[email protected]>
---
 src/kudu/util/CMakeLists.txt |   9 +++
 src/kudu/util/zlib-test.cc   | 157 +++++++++++++++++++++++++++++++++++++++++++
 src/kudu/util/zlib.cc        |  97 +++++++++++++++++---------
 src/kudu/util/zlib.h         |  10 +--
 4 files changed, 236 insertions(+), 37 deletions(-)

diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 501ac5ec1..0bebe9453 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -707,3 +707,12 @@ if(NOT NO_TESTS)
   target_link_libraries(instance_detector-test
     kudu_cloud_util)
 endif()
+
+#######################################
+# zlib-test
+#######################################
+ADD_KUDU_TEST(zlib-test)
+if(NOT NO_TESTS)
+  target_link_libraries(zlib-test
+    zlib)
+endif()
diff --git a/src/kudu/util/zlib-test.cc b/src/kudu/util/zlib-test.cc
new file mode 100644
index 000000000..9347d542a
--- /dev/null
+++ b/src/kudu/util/zlib-test.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/zlib.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <zconf.h>
+#include <zlib.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::ostringstream;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace zlib {
+
+class ZlibTest : public KuduTest {
+ public:
+  ZlibTest()
+      : rand_(SeedRandom()) {
+  }
+
+  void GenerateRandomData(uint8_t* out, size_t size) {
+    uint64_t* p64 = reinterpret_cast<uint64_t*>(out);
+    for (size_t i = 0; i < size / 8; ++i) {
+      *p64++ = rand_.Next64();
+    }
+    const uint8_t* p8_end = out + size;
+    uint8_t* p8 = reinterpret_cast<uint8_t*>(p64);
+    if (p8 < p8_end) {
+      const uint64_t val = rand_.Next64();
+      uint8_t bit_shift = 0;
+      do {
+        *p8++ = static_cast<uint8_t>(val >> bit_shift);
+        bit_shift += 8;
+      } while (p8 < p8_end);
+    }
+  }
+
+protected:
+  Random rand_;
+};
+
+TEST_F(ZlibTest, CompressUncompress) {
+  // Set this to be a multiple of the maximum possible LZ77 window size.
+  constexpr const size_t kMaxSize = 64 * 1024;
+  static_assert((kMaxSize / (1 << MAX_WBITS)) >= 2);
+  static_assert(Z_NO_COMPRESSION <= Z_BEST_COMPRESSION);
+
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]);
+  for (size_t size = 8; size <= kMaxSize;) {
+    GenerateRandomData(chunk.get(), size);
+    for (int level = Z_NO_COMPRESSION; level <= Z_BEST_COMPRESSION; ++level) {
+      SCOPED_TRACE(Substitute("compression level $0 size $1", level, size));
+      Slice data_p(chunk.get(), size);
+      ostringstream oss_c;
+      ASSERT_OK(CompressLevel(data_p, level, &oss_c));
+      const string& data_str_c(oss_c.str());
+      Slice data_c(data_str_c.data(), data_str_c.size());
+      ostringstream oss_p;
+      ASSERT_OK(Uncompress(data_c, &oss_p));
+      const string& data_str_p(oss_p.str());
+      ASSERT_EQ(size, data_str_p.size());
+      ASSERT_EQ(0, memcmp(data_str_p.c_str(), chunk.get(), size));
+    }
+    size += (rand_.Uniform(1024) + 1);
+  }
+}
+
+TEST_F(ZlibTest, UncompressCorruptedData) {
+  constexpr const size_t kMaxSize = 64 * 1024;
+  constexpr const int kLevel = Z_BEST_SPEED;
+
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  unique_ptr<uint8_t[]> chunk(new uint8_t[kMaxSize]);
+  for (size_t size = 256; size <= kMaxSize;) {
+    SCOPED_TRACE(Substitute("size $0", size));
+    GenerateRandomData(chunk.get(), size);
+    Slice data_p(chunk.get(), size);
+    ostringstream oss_c;
+    ASSERT_OK(CompressLevel(data_p, kLevel, &oss_c));
+    const string& data_str_c(oss_c.str());
+
+    for (size_t bytes_num = 1; bytes_num < 32; ++bytes_num) {
+      Slice data_c(data_str_c.data(), data_str_c.size() - bytes_num);
+      ostringstream oss_p;
+      const auto& s = Uncompress(data_c, &oss_p);
+      ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "truncated gzip data");
+    }
+
+    {
+      // Reverse the sequence of bytes in the input data.
+      string data_str_c_reversed;
+      data_str_c_reversed.reserve(data_str_c.size());
+      std::reverse_copy(data_str_c.begin(),
+                        data_str_c.end(),
+                        std::back_inserter(data_str_c_reversed));
+      Slice data_c(data_str_c_reversed.data(), data_str_c_reversed.size());
+      ostringstream oss_p;
+      const auto& s = Uncompress(data_c, &oss_p);
+      ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR");
+    }
+
+    {
+      // Replace the second half of the data with a bogus ASCII symbol.
+      string data_str_cc = data_str_c;
+      auto it = data_str_cc.begin();
+      std::advance(it, size / 2);
+      std::fill(it, data_str_cc.end(), 'x');
+      Slice data_c(data_str_cc.data(), data_str_cc.size());
+      ostringstream oss_p;
+      const auto& s = Uncompress(data_c, &oss_p);
+      ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+      ASSERT_STR_CONTAINS(s.ToString(), "zlib error: DATA_ERROR");
+    }
+    size += (rand_.Uniform(1024) + 1);
+  }
+}
+
+} // namespace zlib
+} // namespace kudu
diff --git a/src/kudu/util/zlib.cc b/src/kudu/util/zlib.cc
index 4bda79ad9..4d70c29a1 100644
--- a/src/kudu/util/zlib.cc
+++ b/src/kudu/util/zlib.cc
@@ -19,20 +19,21 @@
 
 #include <cstdint>
 #include <cstring>
-#include <memory>
 #include <string>
 
+#include <glog/logging.h>
 #include <zconf.h>
 #include <zlib.h>
 
-#include "kudu/gutil/macros.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+using std::ios;
 using std::ostream;
 using std::string;
-using std::unique_ptr;
 
 #define ZRETURN_NOT_OK(call) \
   RETURN_NOT_OK(ZlibResultToStatus(call))
@@ -74,6 +75,12 @@ Status Compress(Slice input, ostream* out) {
 
 // See https://zlib.net/zlib_how.html for context on using zlib.
 Status CompressLevel(Slice input, int level, ostream* out) {
+  DCHECK(out);
+  // Output stream exceptions aren't handled in this function, so the stream
+  // isn't supposed to throw on errors, but just set bad/failure bits instead.
+  DCHECK_EQ(ios::goodbit, out->exceptions());
+  DCHECK(level >= Z_DEFAULT_COMPRESSION && level <= Z_BEST_COMPRESSION);
+
   z_stream zs;
   memset(&zs, 0, sizeof(zs));
   ZRETURN_NOT_OK(deflateInit2(&zs, level, Z_DEFLATED,
@@ -83,55 +90,79 @@ Status CompressLevel(Slice input, int level, ostream* out) {
 
   zs.avail_in = input.size();
   zs.next_in = const_cast<uint8_t*>(input.data());
-  const int kChunkSize = 256 * 1024;
-  unique_ptr<unsigned char[]> chunk(new unsigned char[kChunkSize]);
-  int flush;
+  constexpr const size_t kChunkSize = 16 * 1024;
+  unsigned char buf[kChunkSize];
+  int rc;
   do {
     zs.avail_out = kChunkSize;
-    zs.next_out = chunk.get();
-    flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
-    Status s = ZlibResultToStatus(deflate(&zs, flush));
-    if (!s.ok() && !s.IsEndOfFile()) {
-      return s;
+    zs.next_out = buf;
+    const int flush = (zs.avail_in == 0) ? Z_FINISH : Z_NO_FLUSH;
+    rc = deflate(&zs, flush);
+    if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) {
+      ignore_result(deflateEnd(&zs));
+      return ZlibResultToStatus(rc);
     }
-    int out_size = zs.next_out - chunk.get();
-    if (out_size > 0) {
-      out->write(reinterpret_cast<char *>(chunk.get()), out_size);
+    DCHECK_GT(zs.next_out - buf, 0);
+    out->write(reinterpret_cast<char*>(buf), zs.next_out - buf);
+    if (PREDICT_FALSE(out->fail())) {
+      ignore_result(deflateEnd(&zs));
+      return Status::IOError("error writing to output stream");
     }
-  } while (flush != Z_FINISH);
-  ZRETURN_NOT_OK(deflateEnd(&zs));
-  return Status::OK();
+  } while (rc != Z_STREAM_END);
+
+  return ZlibResultToStatus(deflateEnd(&zs));
 }
 
 // See https://zlib.net/zlib_how.html for context on using zlib.
-Status Uncompress(Slice compressed, std::ostream* out) {
+Status Uncompress(Slice input, ostream* out) {
+  DCHECK(out);
+  // Output stream exceptions aren't handled in this function, so the stream
+  // isn't supposed to throw on errors, but just set bad/failure bits instead.
+  DCHECK_EQ(ios::goodbit, out->exceptions());
+
   // Initialize the z_stream at the start of the data with the
   // data size as the available input.
   z_stream zs;
   memset(&zs, 0, sizeof(zs));
-  zs.next_in = const_cast<uint8_t*>(compressed.data());
-  zs.avail_in = compressed.size();
+  zs.next_in = const_cast<uint8_t*>(input.data());
+  zs.avail_in = input.size();
   // Initialize inflation with the windowBits set to be GZIP compatible.
   // The documentation (https://www.zlib.net/manual.html#Advanced) describes 
that
   // Adding 16 configures inflate to decode the gzip format.
   ZRETURN_NOT_OK(inflateInit2(&zs, MAX_WBITS + 16 /* enable gzip */));
-  // Continue calling inflate, decompressing data into the buffer in 
`zs.next_out` and writing
-  // the buffer content to `out`, until an error is received or there is no 
more data
-  // to decompress.
-  Status s;
+  // Continue calling inflate, decompressing data into the buffer in
+  // `zs.next_out` and writing the buffer content to `out`, until an error
+  // condition is encountered or there is no more data to decompress.
+  constexpr const size_t kChunkSize = 16 * 1024;
+  unsigned char buf[kChunkSize];
+  int rc;
   do {
-    unsigned char buf[4096];
     zs.next_out = buf;
-    zs.avail_out = arraysize(buf);
-    s = ZlibResultToStatus(inflate(&zs, Z_NO_FLUSH));
-    if (!s.ok() && !s.IsEndOfFile()) {
-      return s;
+    zs.avail_out = kChunkSize;
+    rc = inflate(&zs, Z_NO_FLUSH);
+    if (PREDICT_FALSE(rc != Z_OK && rc != Z_STREAM_END)) {
+      ignore_result(inflateEnd(&zs));
+      // Special handling for Z_BUF_ERROR in certain conditions to produce
+      // Status::Corruption() instead of Status::RuntimeError().
+      if (rc == Z_BUF_ERROR && zs.avail_in == 0) {
+        // This means the input was most likely incomplete/truncated. Because
+        // the input isn't a stream in this function, all the input data is
+        // available at once as Slice, so no more available bytes on input
+        // are ever expected at this point.
+        return Status::Corruption("truncated gzip data");
+      }
+      return ZlibResultToStatus(rc);
     }
-    out->write(reinterpret_cast<char *>(buf), zs.next_out - buf);
-  } while (zs.avail_out == 0);
+    DCHECK_GT(zs.next_out - buf, 0);
+    out->write(reinterpret_cast<char*>(buf), zs.next_out - buf);
+    if (PREDICT_FALSE(out->fail())) {
+      ignore_result(inflateEnd(&zs));
+      return Status::IOError("error writing to output stream");
+    }
+  } while (rc != Z_STREAM_END);
+
   // If we haven't returned early with a bad status, finalize inflation.
-  ZRETURN_NOT_OK(inflateEnd(&zs));
-  return Status::OK();
+  return ZlibResultToStatus(inflateEnd(&zs));
 }
 
 } // namespace zlib
diff --git a/src/kudu/util/zlib.h b/src/kudu/util/zlib.h
index 478382b7b..0931d557f 100644
--- a/src/kudu/util/zlib.h
+++ b/src/kudu/util/zlib.h
@@ -26,18 +26,20 @@ namespace zlib {
 
 // Zlib-compress the data in 'input', appending the result to 'out'.
 //
-// In case of an error, some data may still be appended to 'out'.
+// In case of an error, non-OK status is returned and some data may still
+// be appended to 'out'.
 Status Compress(Slice input, std::ostream* out);
 
 // The same as the above, but with a custom level (1-9, where 1 is fastest
 // and 9 is best compression).
 Status CompressLevel(Slice input, int level, std::ostream* out);
 
-// Uncompress the zlib-compressed data in 'compressed', appending the result
+// Uncompress the zlib-compressed data in 'input', appending the result
 // to 'out'.
 //
-// In case of an error, some data may still be appended to 'out'.
-Status Uncompress(Slice compressed, std::ostream* out);
+// In case of an error, non-OK status is returned and some data may still
+// be appended to 'out'.
+Status Uncompress(Slice input, std::ostream* out);
 
 } // namespace zlib
 } // namespace kudu

Reply via email to