This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1bac51f feat: Migrate block compression infrastructure with LZ4,
ZSTD, and none backends (#52)
1bac51f is described below
commit 1bac51f98c32e837f3474189032d3ba58a0a2100
Author: lxy <[email protected]>
AuthorDate: Mon Jun 8 14:47:26 2026 +0800
feat: Migrate block compression infrastructure with LZ4, ZSTD, and none
backends (#52)
* feat: Migrate block compression infrastructure with LZ4, ZSTD, and none
backends
* fix review
* fix comments
---
.../compression/block_compression_factory.cpp | 58 ++++++++++
.../common/compression/block_compression_factory.h | 55 ++++++++++
.../compression/block_compression_factory_test.cpp | 120 +++++++++++++++++++++
.../common/compression/block_compression_type.h | 28 +++++
src/paimon/common/compression/block_compressor.cpp | 30 ++++++
src/paimon/common/compression/block_compressor.h | 55 ++++++++++
.../common/compression/block_decompressor.cpp | 41 +++++++
src/paimon/common/compression/block_decompressor.h | 52 +++++++++
.../lz4/lz4_block_compression_factory.h | 45 ++++++++
.../common/compression/lz4/lz4_block_compressor.h | 53 +++++++++
.../compression/lz4/lz4_block_decompressor.h | 61 +++++++++++
.../compression/none_block_compression_factory.h | 43 ++++++++
.../zstd/zstd_block_compression_factory.h | 51 +++++++++
.../compression/zstd/zstd_block_compressor.h | 49 +++++++++
.../compression/zstd/zstd_block_decompressor.h | 41 +++++++
.../format/parquet/predicate_pushdown_test.cpp | 2 +-
16 files changed, 783 insertions(+), 1 deletion(-)
diff --git a/src/paimon/common/compression/block_compression_factory.cpp
b/src/paimon/common/compression/block_compression_factory.cpp
new file mode 100644
index 0000000..4f5e2fd
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compression_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/common/compression/lz4/lz4_block_compression_factory.h"
+#include "paimon/common/compression/none_block_compression_factory.h"
+#include "paimon/common/compression/zstd/zstd_block_compression_factory.h"
+#include "paimon/common/utils/string_utils.h"
+
+namespace paimon {
+
+Result<std::shared_ptr<BlockCompressionFactory>>
BlockCompressionFactory::Create(
+ const CompressOptions& compression) {
+ auto compress = StringUtils::ToLowerCase(compression.compress);
+ if (compress == "none") {
+ return std::make_shared<NoneBlockCompressionFactory>();
+ } else if (compress == "zstd") {
+ return
std::make_shared<ZstdBlockCompressionFactory>(compression.zstd_level);
+ } else if (compress == "lz4") {
+ return std::make_shared<Lz4BlockCompressionFactory>();
+ }
+ // TODO(liangzi): LZO support
+ return Status::Invalid(fmt::format("Unsupported compression type: {}",
compress));
+}
+
+Result<std::shared_ptr<BlockCompressionFactory>>
BlockCompressionFactory::Create(
+ BlockCompressionType compression) {
+ switch (compression) {
+ case BlockCompressionType::NONE:
+ return std::make_shared<NoneBlockCompressionFactory>();
+ case BlockCompressionType::LZ4:
+ return std::make_shared<Lz4BlockCompressionFactory>();
+ case BlockCompressionType::ZSTD:
+ return
std::make_shared<ZstdBlockCompressionFactory>(ZSTD_COMPRESSION_LEVEL);
+ default:
+ // TODO(liangzi): LZO support
+ return Status::Invalid(
+ fmt::format("Unsupported compression type: {}",
static_cast<int32_t>(compression)));
+ }
+}
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_compression_factory.h
b/src/paimon/common/compression/block_compression_factory.h
new file mode 100644
index 0000000..f103fc8
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_type.h"
+#include "paimon/common/compression/block_compressor.h"
+#include "paimon/common/compression/block_decompressor.h"
+#include "paimon/core/options/compress_options.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+namespace paimon {
+
+/// Each compression codec has an implementation of {@link
BlockCompressionFactory} to create
+/// compressors and decompressors.
+class PAIMON_EXPORT BlockCompressionFactory {
+ public:
+ static Result<std::shared_ptr<BlockCompressionFactory>> Create(
+ const CompressOptions& compression);
+
+ static Result<std::shared_ptr<BlockCompressionFactory>> Create(
+ BlockCompressionType compress_type);
+
+ BlockCompressionFactory() = default;
+ virtual ~BlockCompressionFactory() = default;
+
+ public:
+ virtual BlockCompressionType GetCompressionType() const = 0;
+
+ virtual std::shared_ptr<BlockCompressor> GetCompressor() = 0;
+
+ virtual std::shared_ptr<BlockDecompressor> GetDecompressor() = 0;
+
+ private:
+ // Align java implementation
+ static constexpr int32_t ZSTD_COMPRESSION_LEVEL = 1;
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_compression_factory_test.cpp
b/src/paimon/common/compression/block_compression_factory_test.cpp
new file mode 100644
index 0000000..107d20a
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory_test.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compression_factory.h"
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::test {
+class CompressionFactoryTest : public
::testing::TestWithParam<BlockCompressionType> {};
+
+TEST_P(CompressionFactoryTest, TESTCompressThenDecompress) {
+ int32_t original_len = 16;
+ BlockCompressionType type = GetParam();
+
+ std::string data(original_len, '\0');
+ for (int32_t i = 0; i < original_len; i++) {
+ data[i] = static_cast<char>(i);
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+ ASSERT_EQ(type, factory->GetCompressionType());
+
+ // compress
+ auto compressor = factory->GetCompressor();
+ auto max_len = compressor->GetMaxCompressedSize(data.size());
+ std::string compressed_data(max_len, '\0');
+ auto compressed_size =
+ compressor->Compress(data.data(), data.size(), compressed_data.data(),
max_len);
+ ASSERT_OK(compressed_size);
+ ASSERT_GT(compressed_size.value(), 0);
+ compressed_data.resize(compressed_size.value());
+
+ // decompress
+ auto decompressor = factory->GetDecompressor();
+ std::string decompressed_data(original_len, '\0');
+ auto decompressed_size =
+ decompressor->Decompress(compressed_data.data(),
compressed_data.size(),
+ decompressed_data.data(),
decompressed_data.size());
+ ASSERT_OK(decompressed_size);
+ ASSERT_GT(decompressed_size.value(), 0);
+ ASSERT_EQ(data, decompressed_data);
+
+ std::string read_write_le{4, '\0'};
+ compressor->WriteIntLE(123, read_write_le.data());
+ ASSERT_EQ(123, decompressor->ReadIntLE(read_write_le.data()));
+ compressor->WriteIntLE(100000, read_write_le.data());
+ ASSERT_EQ(100000, decompressor->ReadIntLE(read_write_le.data()));
+ compressor->WriteIntLE(-6555, read_write_le.data());
+ ASSERT_EQ(-6555, decompressor->ReadIntLE(read_write_le.data()));
+ compressor->WriteIntLE(0, read_write_le.data());
+ ASSERT_EQ(0, decompressor->ReadIntLE(read_write_le.data()));
+}
+
+TEST_P(CompressionFactoryTest, TestDecompressTruncatedHeader) {
+ BlockCompressionType type = GetParam();
+ ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+ auto decompressor = factory->GetDecompressor();
+
+ // Source shorter than 8-byte header should return Invalid, not read out
of bounds
+ char short_buf[] = {0x01, 0x02, 0x03};
+ char dst[64] = {};
+ ASSERT_NOK(decompressor->Decompress(short_buf, 3, dst, sizeof(dst)));
+}
+
+TEST_P(CompressionFactoryTest, TestCompressInsufficientOutputBuffer) {
+ BlockCompressionType type = GetParam();
+ ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+ auto compressor = factory->GetCompressor();
+
+ // Incompressible data with a tiny output buffer should fail, not produce
a corrupt block
+ std::string data(1024, '\0');
+ for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
+ data[i] = static_cast<char>(i % 251);
+ }
+ // Output buffer too small: only HEADER_LENGTH bytes, no room for
compressed payload
+ std::string tiny_dst(8, '\0');
+ ASSERT_NOK(compressor->Compress(data.data(), data.size(), tiny_dst.data(),
tiny_dst.size()));
+
+ // Output buffer smaller than HEADER_LENGTH — must not trigger UB from
negative capacity
+ char micro_dst[4] = {};
+ ASSERT_NOK(compressor->Compress(data.data(), data.size(), micro_dst,
sizeof(micro_dst)));
+
+ // Zero-length output buffer
+ ASSERT_NOK(compressor->Compress(data.data(), data.size(), nullptr, 0));
+}
+
+INSTANTIATE_TEST_SUITE_P(BlockCompressionTypeGroup, CompressionFactoryTest,
+ ::testing::Values(BlockCompressionType::LZ4,
BlockCompressionType::ZSTD));
+
+} // namespace paimon::test
diff --git a/src/paimon/common/compression/block_compression_type.h
b/src/paimon/common/compression/block_compression_type.h
new file mode 100644
index 0000000..16126a8
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_type.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Block Compression type.
+enum class BlockCompressionType { NONE = 0, ZSTD = 1, LZ4 = 2, LZO = 3 };
+
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_compressor.cpp
b/src/paimon/common/compression/block_compressor.cpp
new file mode 100644
index 0000000..f2ed4ca
--- /dev/null
+++ b/src/paimon/common/compression/block_compressor.cpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+void BlockCompressor::WriteIntLE(int32_t val, char* buf) {
+ buf[0] = static_cast<char>(val);
+ buf[1] = static_cast<char>(val >> 8);
+ buf[2] = static_cast<char>(val >> 16);
+ buf[3] = static_cast<char>(val >> 24);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_compressor.h
b/src/paimon/common/compression/block_compressor.h
new file mode 100644
index 0000000..473afb7
--- /dev/null
+++ b/src/paimon/common/compression/block_compressor.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// A compressor which compresses a whole byte array each time.
+class BlockCompressor {
+ public:
+ static void WriteIntLE(int32_t val, char* buf);
+
+ public:
+ virtual ~BlockCompressor() = default;
+
+ public:
+ /// Get the max compressed size for a given original size.
+ /// @param src_size The original size
+ /// @return The max compressed size
+ virtual int32_t GetMaxCompressedSize(int32_t src_size) = 0;
+
+ /// Compress data read from src, and write the compressed data to dst.
+ ///
+ /// @param src Uncompressed data to read from
+ /// @param src_length The length of data which want to be compressed
+ /// @param dst The target to write compressed data
+ /// @param dst_length The max length of data
+ /// @return Length of compressed data
+ virtual Result<int32_t> Compress(const char* src, int32_t src_length,
char* dst,
+ int32_t dst_length) = 0;
+
+ public:
+ /// We put two integers before each compressed block, the first integer
represents the
+ /// compressed length of the block, and the second one represents the
original length of the
+ /// block.
+ static constexpr int32_t HEADER_LENGTH = 8;
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_decompressor.cpp
b/src/paimon/common/compression/block_decompressor.cpp
new file mode 100644
index 0000000..5356d74
--- /dev/null
+++ b/src/paimon/common/compression/block_decompressor.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_decompressor.h"
+
+#include "fmt/format.h"
+namespace paimon {
+
+int32_t BlockDecompressor::ReadIntLE(const char* buf) {
+ return
static_cast<int32_t>(static_cast<uint32_t>(static_cast<uint8_t>(buf[0])) |
+
(static_cast<uint32_t>(static_cast<uint8_t>(buf[1])) << 8) |
+
(static_cast<uint32_t>(static_cast<uint8_t>(buf[2])) << 16) |
+
(static_cast<uint32_t>(static_cast<uint8_t>(buf[3])) << 24));
+}
+
+Status BlockDecompressor::ValidateLength(int32_t compressed_len, int32_t
original_len) {
+ if (original_len < 0 || compressed_len < 0 || (original_len == 0 &&
compressed_len != 0) ||
+ (original_len != 0 && compressed_len == 0)) {
+ return Status::Invalid(
+ fmt::format("Input is corrupted, compressed_len={}, ,
original_len={}", compressed_len,
+ original_len));
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/compression/block_decompressor.h
b/src/paimon/common/compression/block_decompressor.h
new file mode 100644
index 0000000..582ad4e
--- /dev/null
+++ b/src/paimon/common/compression/block_decompressor.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// A decompressor which decompresses a block each time.
+class BlockDecompressor {
+ public:
+ static int32_t ReadIntLE(const char* buf);
+
+ static Status ValidateLength(int32_t compressed_len, int32_t original_len);
+
+ public:
+ virtual ~BlockDecompressor() = default;
+
+ public:
+ /// Decompress data read from src, and write the decompressed data to dst.
+ ///
+ /// @param src Compressed data to read from
+ /// @param src_length The length of data which want to be decompressed
+ /// @param dst The target to write decompressed data
+ /// @param dst_length The max length of data
+ /// @return Length of decompressed data
+ virtual Result<int32_t> Decompress(const char* src, int32_t src_length,
char* dst,
+ int32_t dst_length) = 0;
+
+ public:
+ /// We put two integers before each compressed block, the first integer
represents the
+ /// compressed length of the block, and the second one represents the
original length of the
+ /// block.
+ static constexpr int32_t HEADER_LENGTH = 8;
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_compression_factory.h
b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h
new file mode 100644
index 0000000..99c2362
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/compression/lz4/lz4_block_compressor.h"
+#include "paimon/common/compression/lz4/lz4_block_decompressor.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for lz4 codec.
+class Lz4BlockCompressionFactory : public BlockCompressionFactory {
+ public:
+ BlockCompressionType GetCompressionType() const override {
+ return BlockCompressionType::LZ4;
+ }
+
+ std::shared_ptr<BlockCompressor> GetCompressor() override {
+ return std::make_shared<Lz4BlockCompressor>();
+ }
+
+ std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+ return std::make_shared<Lz4BlockDecompressor>();
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_compressor.h
b/src/paimon/common/compression/lz4/lz4_block_compressor.h
new file mode 100644
index 0000000..f8d941c
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_compressor.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <lz4.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+/// Encode data into LZ4 format (not compatible with the LZ4 Frame format).
+class Lz4BlockCompressor : public BlockCompressor {
+ public:
+ int32_t GetMaxCompressedSize(int32_t src_size) override {
+ return BlockCompressor::HEADER_LENGTH + LZ4_compressBound(src_size);
+ }
+
+ Result<int32_t> Compress(const char* src, int32_t src_length, char* dst,
+ int32_t dst_length) override {
+ if (dst_length < BlockCompressor::HEADER_LENGTH) {
+ return Status::Invalid(fmt::format(
+ "Output buffer too small for LZ4 block header, expected at
least {} bytes, got {}",
+ BlockCompressor::HEADER_LENGTH, dst_length));
+ }
+ int32_t compressed_size =
+ LZ4_compress_default(src, dst + BlockCompressor::HEADER_LENGTH,
src_length,
+ dst_length - BlockCompressor::HEADER_LENGTH);
+ if (compressed_size <= 0) {
+ return Status::Invalid(fmt::format("Compression failed with code
{}", compressed_size));
+ }
+ WriteIntLE(compressed_size, dst);
+ WriteIntLE(src_length, dst + 4);
+ return BlockCompressor::HEADER_LENGTH + compressed_size;
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_decompressor.h
b/src/paimon/common/compression/lz4/lz4_block_decompressor.h
new file mode 100644
index 0000000..edbb2a8
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_decompressor.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <lz4.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_decompressor.h"
+
+namespace paimon {
+
+/// Decode data written with {@link Lz4BlockCompressor}.
+class Lz4BlockDecompressor : public BlockDecompressor {
+ public:
+ Result<int32_t> Decompress(const char* src, int32_t src_length, char* dst,
+ int32_t dst_length) override {
+ if (src_length < HEADER_LENGTH) {
+ return Status::Invalid(fmt::format(
+ "Source data too short for LZ4 block header, expected at least
{} bytes, got {}",
+ HEADER_LENGTH, src_length));
+ }
+ auto compressed_len = ReadIntLE(src);
+ auto original_len = ReadIntLE(src + 4);
+ PAIMON_RETURN_NOT_OK(ValidateLength(compressed_len, original_len));
+
+ if (dst_length < original_len) {
+ return Status::Invalid(
+ fmt::format("Buffer length too small, compressed_len= {},
original_len={}",
+ compressed_len, original_len));
+ }
+
+ if (src_length - HEADER_LENGTH < compressed_len) {
+ return Status::Invalid("Source data is not integral for
decompression.");
+ }
+
+ int32_t decompressed_size =
+ LZ4_decompress_safe(src + HEADER_LENGTH, dst, src_length -
HEADER_LENGTH, dst_length);
+ if (decompressed_size != original_len) {
+ return Status::Invalid(fmt::format("Input is corrupted, expected
{}, but got {}",
+ original_len,
decompressed_size));
+ }
+ return decompressed_size;
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/none_block_compression_factory.h
b/src/paimon/common/compression/none_block_compression_factory.h
new file mode 100644
index 0000000..19afbfa
--- /dev/null
+++ b/src/paimon/common/compression/none_block_compression_factory.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for none.
+class NoneBlockCompressionFactory : public BlockCompressionFactory {
+ public:
+ BlockCompressionType GetCompressionType() const override {
+ return BlockCompressionType::NONE;
+ }
+
+ std::shared_ptr<BlockCompressor> GetCompressor() override {
+ return nullptr;
+ }
+
+ std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+ return nullptr;
+ }
+};
+} // namespace paimon
diff --git
a/src/paimon/common/compression/zstd/zstd_block_compression_factory.h
b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h
new file mode 100644
index 0000000..ab8c8ac
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/compression/zstd/zstd_block_compressor.h"
+#include "paimon/common/compression/zstd/zstd_block_decompressor.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for zstd codec.
+class ZstdBlockCompressionFactory : public BlockCompressionFactory {
+ public:
+ explicit ZstdBlockCompressionFactory(int32_t level = 1) : level_(level) {}
+
+ public:
+ BlockCompressionType GetCompressionType() const override {
+ return BlockCompressionType::ZSTD;
+ }
+
+ std::shared_ptr<BlockCompressor> GetCompressor() override {
+ return std::make_shared<ZstdBlockCompressor>(level_);
+ }
+
+ std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+ return std::make_shared<ZstdBlockDecompressor>();
+ }
+
+ private:
+ int32_t level_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/zstd/zstd_block_compressor.h
b/src/paimon/common/compression/zstd/zstd_block_compressor.h
new file mode 100644
index 0000000..3645309
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_compressor.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <zstd.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+/// A {@link BlockCompressor} for zstd.
+class ZstdBlockCompressor : public BlockCompressor {
+ public:
+ explicit ZstdBlockCompressor(int32_t level) : level_(level) {}
+
+ int32_t GetMaxCompressedSize(int32_t src_size) override {
+ return ZSTD_compressBound(src_size);
+ }
+
+ Result<int32_t> Compress(const char* src, int32_t src_length, char* dst,
+ int32_t dst_length) override {
+ size_t const compressed_size = ZSTD_compress(dst, dst_length, src,
src_length, level_);
+ if (ZSTD_isError(compressed_size)) {
+ return Status::Invalid(fmt::format("Compression failed with code
{}", compressed_size));
+ }
+ return compressed_size;
+ }
+
+ private:
+ int32_t level_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/compression/zstd/zstd_block_decompressor.h
b/src/paimon/common/compression/zstd/zstd_block_decompressor.h
new file mode 100644
index 0000000..2494258
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_decompressor.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <zstd.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_decompressor.h"
+
+namespace paimon {
+
+/// Decode data written with {@link ZstdBlockDecompressor}.
+class ZstdBlockDecompressor : public BlockDecompressor {
+ public:
+ Result<int32_t> Decompress(const char* src, int32_t src_length, char* dst,
+ int32_t dst_length) override {
+ int32_t decompressed_size = ZSTD_decompress(dst, dst_length, src,
src_length);
+ if (ZSTD_isError(decompressed_size)) {
+ return Status::Invalid(
+ fmt::format("Input is corrupted with return code {}",
decompressed_size));
+ }
+ return decompressed_size;
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp
b/src/paimon/format/parquet/predicate_pushdown_test.cpp
index 67d7cd5..7ccbaa2 100644
--- a/src/paimon/format/parquet/predicate_pushdown_test.cpp
+++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp
@@ -345,7 +345,7 @@ TEST_F(PredicatePushdownTest, TestStringData) {
CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
}
{
- // f0 like 'me', no data
+ // f0 like 'me', has data
ASSERT_OK_AND_ASSIGN(const auto predicate,
PredicateBuilder::Like(
/*field_index=*/0, /*field_name=*/"f0",
FieldType::STRING,