This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7477107ca370ef4b0709855522f8d955e47edbec
Author: Joe McDonnell <joemcdonn...@cloudera.com>
AuthorDate: Tue Aug 5 13:29:47 2025 -0700

    IMPALA-12108: Add support for LZ4 high compression levels
    
    LZ4 has a high compression mode that gets higher compression ratios
    (at the cost of higher compression time) while maintaining the fast
    decompression speed. This type of compression would be useful for
    workloads that write data once and read it many times.
    
    This adds support for specifying a compression level for the
    LZ4 codec. Compression level 1 is the current fast API. Compression
    levels between LZ4HC_CLEVEL_MIN (3) and LZ4HC_CLEVEL_MAX (12) use
    the high compression API. This lines up with the behavior of the lz4
    commandline.
    
    TPC-H 42 scale comparison
    Compression codec | Avg Time (s) | Geomean Time (s) | Lineitem Size (GB) | 
Compression time for lineitem (s)
    
------------------+--------------+------------------+--------------------+------------------------------
    Snappy            | 2.75         | 2.08             | 8.76               |  
 7.436
    LZ4 level 1       | 2.58         | 1.91             | 9.1                |  
 6.864
    LZ4 level 3       | 2.58         | 1.93             | 7.9                |  
43.918
    LZ4 level 9       | 2.68         | 1.98             | 7.6                | 
125.0
    Zstd level 3      | 3.03         | 2.31             | 6.36               |  
17.274
    Zstd level 6      | 3.10         | 2.38             | 6.33               |  
44.955
    
    LZ4 level 3 is about 10% smaller in data size while being about as fast as
    regular LZ4. It compresses at about the same speed as Zstd level 6.
    
    Testing:
     - Ran perf-AB-test with lz4 high compression levels
     - Added test cases to decompress-test
    
    Change-Id: Ie7470ce38b8710c870cacebc80bc02cf5d022791
    Reviewed-on: http://gerrit.cloudera.org:8080/23254
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/service/query-options-test.cc | 26 ++++++++++++++++++
 be/src/util/codec.cc                 | 11 +++++---
 be/src/util/compress.cc              | 51 ++++++++++++++++++++++++++++++------
 be/src/util/compress.h               | 14 ++++++++--
 be/src/util/decompress-test.cc       |  5 ++++
 5 files changed, 94 insertions(+), 13 deletions(-)

diff --git a/be/src/service/query-options-test.cc 
b/be/src/service/query-options-test.cc
index c16e86fec..32feedfa3 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -17,6 +17,7 @@
 
 #include "service/query-options.h"
 
+#include <lz4hc.h>
 #include <zstd.h>
 #include <zlib.h>
 #include <boost/preprocessor/seq/for_each.hpp>
@@ -630,6 +631,8 @@ TEST(QueryOptions, CompressionCodec) {
       case THdfsCompression::ZLIB:
       case THdfsCompression::BZIP2:
       case THdfsCompression::DEFLATE:
+      case THdfsCompression::LZ4:
+      case THdfsCompression::LZ4_BLOCKED:
         EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:1", 
codec),
         &options, nullptr).ok());
         break;
@@ -700,6 +703,29 @@ TEST(QueryOptions, CompressionCodec) {
     Substitute("BZIP2:$0", bzip_min_clevel - 1), &options, nullptr).ok());
   EXPECT_FALSE(SetQueryOption("compression_codec",
     Substitute("BZIP2:$0", bzip_max_clevel + 1), &options, nullptr).ok());
+
+  // Test compression levels for LZ4 / LZ4_BLOCKED
+  for (const string& lz4_codec : {"lz4", "lz4_blocked"}) {
+    // LZ4's default behavior is considered level 1 (and is tested above).
+    // LZ4's high compression API is used for levels between LZ4HC_CLEVEL_MIN 
(3) and
+    // LZ4HC_CLEVEL_MAX (12)
+    for (int i = LZ4HC_CLEVEL_MIN; i <= LZ4HC_CLEVEL_MAX; i++) {
+      EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:$1", 
lz4_codec, i),
+         &options, nullptr).ok());
+    }
+    // LZ4 added a level 2 in 1.10.0. If using a version without that support, 
test that
+    // level 2 fails. (If using a version with that support, this is tested by 
the loop
+    // above.)
+    if (LZ4HC_CLEVEL_MIN > 2) {
+      EXPECT_FALSE(SetQueryOption("compression_codec",
+          Substitute("$0:$1", lz4_codec, 2), &options, nullptr).ok());
+    }
+    EXPECT_FALSE(SetQueryOption("compression_codec",
+        Substitute("$0:$1", lz4_codec, LZ4HC_CLEVEL_MAX + 1), &options, 
nullptr).ok());
+    // Zero is not a valid level
+    EXPECT_FALSE(SetQueryOption("compression_codec",
+        Substitute("$0:$1", lz4_codec, 0), &options, nullptr).ok());
+  }
 #undef CASE
 #undef ENTRIES
 #undef ENTRY
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 57307ffb0..9a5c15187 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -111,14 +111,16 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool 
reuse, const CodecInfo& c
       compressor->reset(new SnappyCompressor(mem_pool, reuse));
       break;
     case THdfsCompression::LZ4:
-      compressor->reset(new Lz4Compressor(mem_pool, reuse));
+      compressor->reset(new Lz4Compressor(mem_pool, reuse,
+          codec_info.compression_level_));
       break;
     case THdfsCompression::ZSTD:
       compressor->reset(new ZstandardCompressor(mem_pool, reuse,
           codec_info.compression_level_));
       break;
     case THdfsCompression::LZ4_BLOCKED:
-      compressor->reset(new Lz4BlockCompressor(mem_pool, reuse));
+      compressor->reset(new Lz4BlockCompressor(mem_pool, reuse,
+          codec_info.compression_level_));
       break;
     default: {
       if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
@@ -140,10 +142,13 @@ Status 
Codec::ValidateCompressionLevel(THdfsCompression::type format,
       return ZstandardCompressor::ValidateCompressionLevel(compression_level);
     case THdfsCompression::BZIP2:
       return BzipCompressor::ValidateCompressionLevel(compression_level);
+    case THdfsCompression::LZ4:
+    case THdfsCompression::LZ4_BLOCKED:
+      return Lz4Compressor::ValidateCompressionLevel(compression_level);
     default:
       // Note: BZIP2 compression levels are supported for disk-spill
       // Parquet or ORC does not support BZIP compression
-      return Status("Compression level only supported for ZSTD, ZLIB(GZIP, 
DEFLATE)"
+      return Status("Compression level only supported for ZSTD, ZLIB(GZIP, 
DEFLATE), LZ4,"
           " and BZIP2. Note: BZIP2 is not supported by Parquet(i.e. to write 
tables)");
   }
 }
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 9c28f6c59..e64aa1077 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -26,6 +26,7 @@
 #include <zlib.h>
 #include <boost/crc.hpp>
 #include <lz4.h>
+#include <lz4hc.h>
 #include <snappy.h>
 #include <zconf.h>
 #include <zstd.h>
@@ -341,8 +342,24 @@ uint32_t SnappyCompressor::ComputeChecksum(int64_t 
input_len, const uint8_t* inp
   return ((chk >> 15) | (chk << 17)) + 0xa282ead8;
 }
 
-Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer)
-  : Codec(mem_pool, reuse_buffer) {
+Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer,
+    std::optional<int> compression_level)
+  : Codec(mem_pool, reuse_buffer), 
compression_level_(compression_level.value_or(1)) {
+}
+
+Status Lz4Compressor::ValidateCompressionLevel(int compression_level) {
+  if (compression_level == 1 ||
+      (compression_level >= LZ4HC_CLEVEL_MIN &&
+       compression_level <= LZ4HC_CLEVEL_MAX)) {
+    return Status::OK();
+  }
+  return Status(Substitute("Invalid LZ4 compression level '$0'."
+      " Valid values are 1 or between [$1, $2] for high compression", 
compression_level,
+      LZ4HC_CLEVEL_MIN, LZ4HC_CLEVEL_MAX));
+}
+
+Status Lz4Compressor::Init() {
+  return ValidateCompressionLevel(compression_level_);
 }
 
 int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
@@ -357,8 +374,16 @@ Status Lz4Compressor::ProcessBlock(bool 
output_preallocated, int64_t input_lengt
   if (MaxOutputLen(input_length, input) == 0) {
     return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, input_length);
   }
-  *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
-      reinterpret_cast<char*>(*output), input_length, *output_length);
+  if (compression_level_ == 1) {
+    *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input),
+        reinterpret_cast<char*>(*output), input_length, *output_length);
+  } else {
+    DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN &&
+        compression_level_ <= LZ4HC_CLEVEL_MAX);
+    *output_length = LZ4_compress_HC(reinterpret_cast<const char*>(input),
+        reinterpret_cast<char*>(*output), input_length, *output_length,
+        compression_level_);
+  }
   return Status::OK();
 }
 
@@ -411,8 +436,9 @@ Status ZstandardCompressor::ValidateCompressionLevel(int 
compression_level) {
   }
 }
 
-Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer)
-  : Codec(mem_pool, reuse_buffer) {
+Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer,
+    std::optional<int> compression_level)
+  : Codec(mem_pool, reuse_buffer), 
compression_level_(compression_level.value_or(1)) {
 }
 
 int64_t Lz4BlockCompressor::MaxOutputLen(int64_t input_length, const uint8_t* 
input) {
@@ -444,8 +470,17 @@ Status Lz4BlockCompressor::ProcessBlock(bool 
output_preallocated, int64_t input_
   if (input_length > 0) {
     uint8_t* sizep = outp;
     outp += sizeof(int32_t);
-    const int64_t size = LZ4_compress_default(reinterpret_cast<const 
char*>(input),
-        reinterpret_cast<char*>(outp), input_length, *output_length - (outp - 
*output));
+    int64_t size = 0;
+    if (compression_level_ == 1) {
+      size = LZ4_compress_default(reinterpret_cast<const char*>(input),
+          reinterpret_cast<char*>(outp), input_length, *output_length - (outp 
- *output));
+    } else {
+      DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN &&
+          compression_level_ <= LZ4HC_CLEVEL_MAX);
+      size = LZ4_compress_HC(reinterpret_cast<const char*>(input),
+          reinterpret_cast<char*>(outp), input_length, *output_length - (outp 
- *output),
+          compression_level_);
+    }
     if (size == 0) { return Status(TErrorCode::LZ4_COMPRESS_DEFAULT_FAILED); }
     ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
     outp += size;
diff --git a/be/src/util/compress.h b/be/src/util/compress.h
index 7a4139c2c..d91d5194c 100644
--- a/be/src/util/compress.h
+++ b/be/src/util/compress.h
@@ -138,15 +138,22 @@ class SnappyCompressor : public Codec {
 /// allocated and will cause an error if asked to do so.
 class Lz4Compressor : public Codec {
  public:
-  Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
+  Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
+      std::optional<int> compression_level = std::nullopt);
   virtual ~Lz4Compressor() { }
 
+  virtual Status Init() override WARN_UNUSED_RESULT;
+
   virtual int64_t MaxOutputLen(
       int64_t input_len, const uint8_t* input = nullptr) override;
   virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
       const uint8_t* input, int64_t* output_length,
       uint8_t** output) override WARN_UNUSED_RESULT;
   virtual std::string file_extension() const override { return "lz4"; }
+
+  static Status ValidateCompressionLevel(int compression_level);
+ private:
+  int compression_level_;
 };
 
 /// ZStandard compression codec.
@@ -175,7 +182,8 @@ class ZstandardCompressor : public Codec {
 /// Hadoop's block compression scheme on top of LZ4.
 class Lz4BlockCompressor : public Codec {
  public:
-  Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
+  Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false,
+      std::optional<int> compression_level = std::nullopt);
   virtual ~Lz4BlockCompressor() { }
 
   virtual int64_t MaxOutputLen(
@@ -184,5 +192,7 @@ class Lz4BlockCompressor : public Codec {
       const uint8_t* input, int64_t* output_length,
       uint8_t** output) override WARN_UNUSED_RESULT;
   virtual std::string file_extension() const override { return "lz4"; }
+ private:
+  int compression_level_;
 };
 }
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 98f6128a6..a2c1b8a38 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <lz4hc.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <zstd.h>
@@ -405,6 +406,8 @@ TEST_F(DecompressorTest, Snappy) {
 
 TEST_F(DecompressorTest, LZ4) {
   RunTest(THdfsCompression::LZ4);
+  RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MIN);
+  RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MAX);
 }
 
 TEST_F(DecompressorTest, Gzip) {
@@ -608,6 +611,8 @@ TEST_F(DecompressorTest, LZ4HadoopCompat) {
 
 TEST_F(DecompressorTest, LZ4Blocked) {
   RunTest(THdfsCompression::LZ4_BLOCKED);
+  RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MIN);
+  RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MAX);
 }
 }
 

Reply via email to