This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7477107ca370ef4b0709855522f8d955e47edbec Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Tue Aug 5 13:29:47 2025 -0700 IMPALA-12108: Add support for LZ4 high compression levels LZ4 has a high compression mode that gets higher compression ratios (at the cost of higher compression time) while maintaining the fast decompression speed. This type of compression would be useful for workloads that write data once and read it many times. This adds support for specifying a compression level for the LZ4 codec. Compression level 1 is the current fast API. Compression levels between LZ4HC_CLEVEL_MIN (3) and LZ4HC_CLEVEL_MAX (12) use the high compression API. This lines up with the behavior of the lz4 commandline. TPC-H 42 scale comparison Compression codec | Avg Time (s) | Geomean Time (s) | Lineitem Size (GB) | Compression time for lineitem (s) ------------------+--------------+------------------+--------------------+------------------------------ Snappy | 2.75 | 2.08 | 8.76 | 7.436 LZ4 level 1 | 2.58 | 1.91 | 9.1 | 6.864 LZ4 level 3 | 2.58 | 1.93 | 7.9 | 43.918 LZ4 level 9 | 2.68 | 1.98 | 7.6 | 125.0 Zstd level 3 | 3.03 | 2.31 | 6.36 | 17.274 Zstd level 6 | 3.10 | 2.38 | 6.33 | 44.955 LZ4 level 3 is about 10% smaller in data size while being about as fast as regular LZ4. It compresses at about the same speed as Zstd level 6. Testing: - Ran perf-AB-test with lz4 high compression levels - Added test cases to decompress-test Change-Id: Ie7470ce38b8710c870cacebc80bc02cf5d022791 Reviewed-on: http://gerrit.cloudera.org:8080/23254 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/service/query-options-test.cc | 26 ++++++++++++++++++ be/src/util/codec.cc | 11 +++++--- be/src/util/compress.cc | 51 ++++++++++++++++++++++++++++++------ be/src/util/compress.h | 14 ++++++++-- be/src/util/decompress-test.cc | 5 ++++ 5 files changed, 94 insertions(+), 13 deletions(-) diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index c16e86fec..32feedfa3 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -17,6 +17,7 @@ #include "service/query-options.h" +#include <lz4hc.h> #include <zstd.h> #include <zlib.h> #include <boost/preprocessor/seq/for_each.hpp> @@ -630,6 +631,8 @@ TEST(QueryOptions, CompressionCodec) { case THdfsCompression::ZLIB: case THdfsCompression::BZIP2: case THdfsCompression::DEFLATE: + case THdfsCompression::LZ4: + case THdfsCompression::LZ4_BLOCKED: EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:1", codec), &options, nullptr).ok()); break; @@ -700,6 +703,29 @@ TEST(QueryOptions, CompressionCodec) { Substitute("BZIP2:$0", bzip_min_clevel - 1), &options, nullptr).ok()); EXPECT_FALSE(SetQueryOption("compression_codec", Substitute("BZIP2:$0", bzip_max_clevel + 1), &options, nullptr).ok()); + + // Test compression levels for LZ4 / LZ4_BLOCKED + for (const string& lz4_codec : {"lz4", "lz4_blocked"}) { + // LZ4's default behavior is considered level 1 (and is tested above). + // LZ4's high compression API is used for levels between LZ4HC_CLEVEL_MIN (3) and + // LZ4HC_CLEVEL_MAX (12) + for (int i = LZ4HC_CLEVEL_MIN; i <= LZ4HC_CLEVEL_MAX; i++) { + EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0:$1", lz4_codec, i), + &options, nullptr).ok()); + } + // LZ4 added a level 2 in 1.10.0. If using a version without that support, test that + // level 2 fails. (If using a version with that support, this is tested by the loop + // above.) + if (LZ4HC_CLEVEL_MIN > 2) { + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("$0:$1", lz4_codec, 2), &options, nullptr).ok()); + } + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("$0:$1", lz4_codec, LZ4HC_CLEVEL_MAX + 1), &options, nullptr).ok()); + // Zero is not a valid level + EXPECT_FALSE(SetQueryOption("compression_codec", + Substitute("$0:$1", lz4_codec, 0), &options, nullptr).ok()); + } #undef CASE #undef ENTRIES #undef ENTRY diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc index 57307ffb0..9a5c15187 100644 --- a/be/src/util/codec.cc +++ b/be/src/util/codec.cc @@ -111,14 +111,16 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& c compressor->reset(new SnappyCompressor(mem_pool, reuse)); break; case THdfsCompression::LZ4: - compressor->reset(new Lz4Compressor(mem_pool, reuse)); + compressor->reset(new Lz4Compressor(mem_pool, reuse, + codec_info.compression_level_)); break; case THdfsCompression::ZSTD: compressor->reset(new ZstandardCompressor(mem_pool, reuse, codec_info.compression_level_)); break; case THdfsCompression::LZ4_BLOCKED: - compressor->reset(new Lz4BlockCompressor(mem_pool, reuse)); + compressor->reset(new Lz4BlockCompressor(mem_pool, reuse, + codec_info.compression_level_)); break; default: { if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG); @@ -140,10 +142,13 @@ Status Codec::ValidateCompressionLevel(THdfsCompression::type format, return ZstandardCompressor::ValidateCompressionLevel(compression_level); case THdfsCompression::BZIP2: return BzipCompressor::ValidateCompressionLevel(compression_level); + case THdfsCompression::LZ4: + case THdfsCompression::LZ4_BLOCKED: + return Lz4Compressor::ValidateCompressionLevel(compression_level); default: // Note: BZIP2 compression levels are supported for disk-spill // Parquet or ORC does not support BZIP compression - return Status("Compression level only supported for ZSTD, ZLIB(GZIP, DEFLATE)" + return Status("Compression level only supported for ZSTD, ZLIB(GZIP, DEFLATE), LZ4," " and BZIP2. Note: BZIP2 is not supported by Parquet(i.e. to write tables)"); } } diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 9c28f6c59..e64aa1077 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -26,6 +26,7 @@ #include <zlib.h> #include <boost/crc.hpp> #include <lz4.h> +#include <lz4hc.h> #include <snappy.h> #include <zconf.h> #include <zstd.h> @@ -341,8 +342,24 @@ uint32_t SnappyCompressor::ComputeChecksum(int64_t input_len, const uint8_t* inp return ((chk >> 15) | (chk << 17)) + 0xa282ead8; } -Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer) - : Codec(mem_pool, reuse_buffer) { +Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level) + : Codec(mem_pool, reuse_buffer), compression_level_(compression_level.value_or(1)) { +} + +Status Lz4Compressor::ValidateCompressionLevel(int compression_level) { + if (compression_level == 1 || + (compression_level >= LZ4HC_CLEVEL_MIN && + compression_level <= LZ4HC_CLEVEL_MAX)) { + return Status::OK(); + } + return Status(Substitute("Invalid LZ4 compression level '$0'." + " Valid values are 1 or between [$1, $2] for high compression", compression_level, + LZ4HC_CLEVEL_MIN, LZ4HC_CLEVEL_MAX)); +} + +Status Lz4Compressor::Init() { + return ValidateCompressionLevel(compression_level_); } int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) { @@ -357,8 +374,16 @@ Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_lengt if (MaxOutputLen(input_length, input) == 0) { return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, input_length); } - *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input), - reinterpret_cast<char*>(*output), input_length, *output_length); + if (compression_level_ == 1) { + *output_length = LZ4_compress_default(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(*output), input_length, *output_length); + } else { + DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN && + compression_level_ <= LZ4HC_CLEVEL_MAX); + *output_length = LZ4_compress_HC(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(*output), input_length, *output_length, + compression_level_); + } return Status::OK(); } @@ -411,8 +436,9 @@ Status ZstandardCompressor::ValidateCompressionLevel(int compression_level) { } } -Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer) - : Codec(mem_pool, reuse_buffer) { +Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer, + std::optional<int> compression_level) + : Codec(mem_pool, reuse_buffer), compression_level_(compression_level.value_or(1)) { } int64_t Lz4BlockCompressor::MaxOutputLen(int64_t input_length, const uint8_t* input) { @@ -444,8 +470,17 @@ Status Lz4BlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_ if (input_length > 0) { uint8_t* sizep = outp; outp += sizeof(int32_t); - const int64_t size = LZ4_compress_default(reinterpret_cast<const char*>(input), - reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output)); + int64_t size = 0; + if (compression_level_ == 1) { + size = LZ4_compress_default(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output)); + } else { + DCHECK(compression_level_ >= LZ4HC_CLEVEL_MIN && + compression_level_ <= LZ4HC_CLEVEL_MAX); + size = LZ4_compress_HC(reinterpret_cast<const char*>(input), + reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output), + compression_level_); + } if (size == 0) { return Status(TErrorCode::LZ4_COMPRESS_DEFAULT_FAILED); } ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size)); outp += size; diff --git a/be/src/util/compress.h b/be/src/util/compress.h index 7a4139c2c..d91d5194c 100644 --- a/be/src/util/compress.h +++ b/be/src/util/compress.h @@ -138,15 +138,22 @@ class SnappyCompressor : public Codec { /// allocated and will cause an error if asked to do so. class Lz4Compressor : public Codec { public: - Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); + Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false, + std::optional<int> compression_level = std::nullopt); virtual ~Lz4Compressor() { } + virtual Status Init() override WARN_UNUSED_RESULT; + virtual int64_t MaxOutputLen( int64_t input_len, const uint8_t* input = nullptr) override; virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t* input, int64_t* output_length, uint8_t** output) override WARN_UNUSED_RESULT; virtual std::string file_extension() const override { return "lz4"; } + + static Status ValidateCompressionLevel(int compression_level); + private: + int compression_level_; }; /// ZStandard compression codec. @@ -175,7 +182,8 @@ class ZstandardCompressor : public Codec { /// Hadoop's block compression scheme on top of LZ4. class Lz4BlockCompressor : public Codec { public: - Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false); + Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false, + std::optional<int> compression_level = std::nullopt); virtual ~Lz4BlockCompressor() { } virtual int64_t MaxOutputLen( @@ -184,5 +192,7 @@ class Lz4BlockCompressor : public Codec { const uint8_t* input, int64_t* output_length, uint8_t** output) override WARN_UNUSED_RESULT; virtual std::string file_extension() const override { return "lz4"; } + private: + int compression_level_; }; } diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc index 98f6128a6..a2c1b8a38 100644 --- a/be/src/util/decompress-test.cc +++ b/be/src/util/decompress-test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include <lz4hc.h> #include <stdio.h> #include <stdlib.h> #include <zstd.h> @@ -405,6 +406,8 @@ TEST_F(DecompressorTest, Snappy) { TEST_F(DecompressorTest, LZ4) { RunTest(THdfsCompression::LZ4); + RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MIN); + RunTest(THdfsCompression::LZ4, LZ4HC_CLEVEL_MAX); } TEST_F(DecompressorTest, Gzip) { @@ -608,6 +611,8 @@ TEST_F(DecompressorTest, LZ4HadoopCompat) { TEST_F(DecompressorTest, LZ4Blocked) { RunTest(THdfsCompression::LZ4_BLOCKED); + RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MIN); + RunTest(THdfsCompression::LZ4_BLOCKED, LZ4HC_CLEVEL_MAX); } }