This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bac51f  feat: Migrate block compression infrastructure with LZ4, 
ZSTD, and none backends (#52)
1bac51f is described below

commit 1bac51f98c32e837f3474189032d3ba58a0a2100
Author: lxy <[email protected]>
AuthorDate: Mon Jun 8 14:47:26 2026 +0800

    feat: Migrate block compression infrastructure with LZ4, ZSTD, and none 
backends (#52)
    
    * feat: Migrate block compression infrastructure with LZ4, ZSTD, and none 
backends
    
    * fix review
    
    * fix comments
---
 .../compression/block_compression_factory.cpp      |  58 ++++++++++
 .../common/compression/block_compression_factory.h |  55 ++++++++++
 .../compression/block_compression_factory_test.cpp | 120 +++++++++++++++++++++
 .../common/compression/block_compression_type.h    |  28 +++++
 src/paimon/common/compression/block_compressor.cpp |  30 ++++++
 src/paimon/common/compression/block_compressor.h   |  55 ++++++++++
 .../common/compression/block_decompressor.cpp      |  41 +++++++
 src/paimon/common/compression/block_decompressor.h |  52 +++++++++
 .../lz4/lz4_block_compression_factory.h            |  45 ++++++++
 .../common/compression/lz4/lz4_block_compressor.h  |  53 +++++++++
 .../compression/lz4/lz4_block_decompressor.h       |  61 +++++++++++
 .../compression/none_block_compression_factory.h   |  43 ++++++++
 .../zstd/zstd_block_compression_factory.h          |  51 +++++++++
 .../compression/zstd/zstd_block_compressor.h       |  49 +++++++++
 .../compression/zstd/zstd_block_decompressor.h     |  41 +++++++
 .../format/parquet/predicate_pushdown_test.cpp     |   2 +-
 16 files changed, 783 insertions(+), 1 deletion(-)

diff --git a/src/paimon/common/compression/block_compression_factory.cpp 
b/src/paimon/common/compression/block_compression_factory.cpp
new file mode 100644
index 0000000..4f5e2fd
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compression_factory.h"
+
+#include "fmt/format.h"
+#include "paimon/common/compression/lz4/lz4_block_compression_factory.h"
+#include "paimon/common/compression/none_block_compression_factory.h"
+#include "paimon/common/compression/zstd/zstd_block_compression_factory.h"
+#include "paimon/common/utils/string_utils.h"
+
+namespace paimon {
+
+Result<std::shared_ptr<BlockCompressionFactory>> 
BlockCompressionFactory::Create(
+    const CompressOptions& compression) {
+    auto compress = StringUtils::ToLowerCase(compression.compress);
+    if (compress == "none") {
+        return std::make_shared<NoneBlockCompressionFactory>();
+    } else if (compress == "zstd") {
+        return 
std::make_shared<ZstdBlockCompressionFactory>(compression.zstd_level);
+    } else if (compress == "lz4") {
+        return std::make_shared<Lz4BlockCompressionFactory>();
+    }
+    // TODO(liangzi): LZO support
+    return Status::Invalid(fmt::format("Unsupported compression type: {}", 
compress));
+}
+
+Result<std::shared_ptr<BlockCompressionFactory>> 
BlockCompressionFactory::Create(
+    BlockCompressionType compression) {
+    switch (compression) {
+        case BlockCompressionType::NONE:
+            return std::make_shared<NoneBlockCompressionFactory>();
+        case BlockCompressionType::LZ4:
+            return std::make_shared<Lz4BlockCompressionFactory>();
+        case BlockCompressionType::ZSTD:
+            return 
std::make_shared<ZstdBlockCompressionFactory>(ZSTD_COMPRESSION_LEVEL);
+        default:
+            // TODO(liangzi): LZO support
+            return Status::Invalid(
+                fmt::format("Unsupported compression type: {}", 
static_cast<int32_t>(compression)));
+    }
+}
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_compression_factory.h 
b/src/paimon/common/compression/block_compression_factory.h
new file mode 100644
index 0000000..f103fc8
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_type.h"
+#include "paimon/common/compression/block_compressor.h"
+#include "paimon/common/compression/block_decompressor.h"
+#include "paimon/core/options/compress_options.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+namespace paimon {
+
+/// Each compression codec has an implementation of {@link 
BlockCompressionFactory} to create
+/// compressors and decompressors.
+class PAIMON_EXPORT BlockCompressionFactory {
+ public:
+    static Result<std::shared_ptr<BlockCompressionFactory>> Create(
+        const CompressOptions& compression);
+
+    static Result<std::shared_ptr<BlockCompressionFactory>> Create(
+        BlockCompressionType compress_type);
+
+    BlockCompressionFactory() = default;
+    virtual ~BlockCompressionFactory() = default;
+
+ public:
+    virtual BlockCompressionType GetCompressionType() const = 0;
+
+    virtual std::shared_ptr<BlockCompressor> GetCompressor() = 0;
+
+    virtual std::shared_ptr<BlockDecompressor> GetDecompressor() = 0;
+
+ private:
+    // Align java implementation
+    static constexpr int32_t ZSTD_COMPRESSION_LEVEL = 1;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_compression_factory_test.cpp 
b/src/paimon/common/compression/block_compression_factory_test.cpp
new file mode 100644
index 0000000..107d20a
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_factory_test.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compression_factory.h"
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::test {
+class CompressionFactoryTest : public 
::testing::TestWithParam<BlockCompressionType> {};
+
+TEST_P(CompressionFactoryTest, TESTCompressThenDecompress) {
+    int32_t original_len = 16;
+    BlockCompressionType type = GetParam();
+
+    std::string data(original_len, '\0');
+    for (int32_t i = 0; i < original_len; i++) {
+        data[i] = static_cast<char>(i);
+    }
+
+    ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+    ASSERT_EQ(type, factory->GetCompressionType());
+
+    // compress
+    auto compressor = factory->GetCompressor();
+    auto max_len = compressor->GetMaxCompressedSize(data.size());
+    std::string compressed_data(max_len, '\0');
+    auto compressed_size =
+        compressor->Compress(data.data(), data.size(), compressed_data.data(), 
max_len);
+    ASSERT_OK(compressed_size);
+    ASSERT_GT(compressed_size.value(), 0);
+    compressed_data.resize(compressed_size.value());
+
+    // decompress
+    auto decompressor = factory->GetDecompressor();
+    std::string decompressed_data(original_len, '\0');
+    auto decompressed_size =
+        decompressor->Decompress(compressed_data.data(), 
compressed_data.size(),
+                                 decompressed_data.data(), 
decompressed_data.size());
+    ASSERT_OK(decompressed_size);
+    ASSERT_GT(decompressed_size.value(), 0);
+    ASSERT_EQ(data, decompressed_data);
+
+    std::string read_write_le{4, '\0'};
+    compressor->WriteIntLE(123, read_write_le.data());
+    ASSERT_EQ(123, decompressor->ReadIntLE(read_write_le.data()));
+    compressor->WriteIntLE(100000, read_write_le.data());
+    ASSERT_EQ(100000, decompressor->ReadIntLE(read_write_le.data()));
+    compressor->WriteIntLE(-6555, read_write_le.data());
+    ASSERT_EQ(-6555, decompressor->ReadIntLE(read_write_le.data()));
+    compressor->WriteIntLE(0, read_write_le.data());
+    ASSERT_EQ(0, decompressor->ReadIntLE(read_write_le.data()));
+}
+
+TEST_P(CompressionFactoryTest, TestDecompressTruncatedHeader) {
+    BlockCompressionType type = GetParam();
+    ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+    auto decompressor = factory->GetDecompressor();
+
+    // Source shorter than 8-byte header should return Invalid, not read out 
of bounds
+    char short_buf[] = {0x01, 0x02, 0x03};
+    char dst[64] = {};
+    ASSERT_NOK(decompressor->Decompress(short_buf, 3, dst, sizeof(dst)));
+}
+
+TEST_P(CompressionFactoryTest, TestCompressInsufficientOutputBuffer) {
+    BlockCompressionType type = GetParam();
+    ASSERT_OK_AND_ASSIGN(auto factory, BlockCompressionFactory::Create(type));
+    auto compressor = factory->GetCompressor();
+
+    // Incompressible data with a tiny output buffer should fail, not produce 
a corrupt block
+    std::string data(1024, '\0');
+    for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
+        data[i] = static_cast<char>(i % 251);
+    }
+    // Output buffer too small: only HEADER_LENGTH bytes, no room for 
compressed payload
+    std::string tiny_dst(8, '\0');
+    ASSERT_NOK(compressor->Compress(data.data(), data.size(), tiny_dst.data(), 
tiny_dst.size()));
+
+    // Output buffer smaller than HEADER_LENGTH — must not trigger UB from 
negative capacity
+    char micro_dst[4] = {};
+    ASSERT_NOK(compressor->Compress(data.data(), data.size(), micro_dst, 
sizeof(micro_dst)));
+
+    // Zero-length output buffer
+    ASSERT_NOK(compressor->Compress(data.data(), data.size(), nullptr, 0));
+}
+
+INSTANTIATE_TEST_SUITE_P(BlockCompressionTypeGroup, CompressionFactoryTest,
+                         ::testing::Values(BlockCompressionType::LZ4, 
BlockCompressionType::ZSTD));
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/compression/block_compression_type.h 
b/src/paimon/common/compression/block_compression_type.h
new file mode 100644
index 0000000..16126a8
--- /dev/null
+++ b/src/paimon/common/compression/block_compression_type.h
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Block Compression type.
+enum class BlockCompressionType { NONE = 0, ZSTD = 1, LZ4 = 2, LZO = 3 };
+
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_compressor.cpp 
b/src/paimon/common/compression/block_compressor.cpp
new file mode 100644
index 0000000..f2ed4ca
--- /dev/null
+++ b/src/paimon/common/compression/block_compressor.cpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+void BlockCompressor::WriteIntLE(int32_t val, char* buf) {
+    buf[0] = static_cast<char>(val);
+    buf[1] = static_cast<char>(val >> 8);
+    buf[2] = static_cast<char>(val >> 16);
+    buf[3] = static_cast<char>(val >> 24);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_compressor.h 
b/src/paimon/common/compression/block_compressor.h
new file mode 100644
index 0000000..473afb7
--- /dev/null
+++ b/src/paimon/common/compression/block_compressor.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// A compressor which compresses a whole byte array each time.
+class BlockCompressor {
+ public:
+    static void WriteIntLE(int32_t val, char* buf);
+
+ public:
+    virtual ~BlockCompressor() = default;
+
+ public:
+    /// Get the max compressed size for a given original size.
+    /// @param src_size The original size
+    /// @return The max compressed size
+    virtual int32_t GetMaxCompressedSize(int32_t src_size) = 0;
+
+    /// Compress data read from src, and write the compressed data to dst.
+    ///
+    /// @param src Uncompressed data to read from
+    /// @param src_length The length of data which want to be compressed
+    /// @param dst The target to write compressed data
+    /// @param dst_length The max length of data
+    /// @return Length of compressed data
+    virtual Result<int32_t> Compress(const char* src, int32_t src_length, 
char* dst,
+                                     int32_t dst_length) = 0;
+
+ public:
+    /// We put two integers before each compressed block, the first integer 
represents the
+    /// compressed length of the block, and the second one represents the 
original length of the
+    /// block.
+    static constexpr int32_t HEADER_LENGTH = 8;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_decompressor.cpp 
b/src/paimon/common/compression/block_decompressor.cpp
new file mode 100644
index 0000000..5356d74
--- /dev/null
+++ b/src/paimon/common/compression/block_decompressor.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/compression/block_decompressor.h"
+
+#include "fmt/format.h"
+namespace paimon {
+
+int32_t BlockDecompressor::ReadIntLE(const char* buf) {
+    return 
static_cast<int32_t>(static_cast<uint32_t>(static_cast<uint8_t>(buf[0])) |
+                                
(static_cast<uint32_t>(static_cast<uint8_t>(buf[1])) << 8) |
+                                
(static_cast<uint32_t>(static_cast<uint8_t>(buf[2])) << 16) |
+                                
(static_cast<uint32_t>(static_cast<uint8_t>(buf[3])) << 24));
+}
+
+Status BlockDecompressor::ValidateLength(int32_t compressed_len, int32_t 
original_len) {
+    if (original_len < 0 || compressed_len < 0 || (original_len == 0 && 
compressed_len != 0) ||
+        (original_len != 0 && compressed_len == 0)) {
+        return Status::Invalid(
+            fmt::format("Input is corrupted, compressed_len={}, , 
original_len={}", compressed_len,
+                        original_len));
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/compression/block_decompressor.h 
b/src/paimon/common/compression/block_decompressor.h
new file mode 100644
index 0000000..582ad4e
--- /dev/null
+++ b/src/paimon/common/compression/block_decompressor.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// A decompressor which decompresses a block each time.
+class BlockDecompressor {
+ public:
+    static int32_t ReadIntLE(const char* buf);
+
+    static Status ValidateLength(int32_t compressed_len, int32_t original_len);
+
+ public:
+    virtual ~BlockDecompressor() = default;
+
+ public:
+    /// Decompress data read from src, and write the decompressed data to dst.
+    ///
+    /// @param src Compressed data to read from
+    /// @param src_length The length of data which want to be decompressed
+    /// @param dst The target to write decompressed data
+    /// @param dst_length The max length of data
+    /// @return Length of decompressed data
+    virtual Result<int32_t> Decompress(const char* src, int32_t src_length, 
char* dst,
+                                       int32_t dst_length) = 0;
+
+ public:
+    /// We put two integers before each compressed block, the first integer 
represents the
+    /// compressed length of the block, and the second one represents the 
original length of the
+    /// block.
+    static constexpr int32_t HEADER_LENGTH = 8;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_compression_factory.h 
b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h
new file mode 100644
index 0000000..99c2362
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_compression_factory.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/compression/lz4/lz4_block_compressor.h"
+#include "paimon/common/compression/lz4/lz4_block_decompressor.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for lz4 codec.
+class Lz4BlockCompressionFactory : public BlockCompressionFactory {
+ public:
+    BlockCompressionType GetCompressionType() const override {
+        return BlockCompressionType::LZ4;
+    }
+
+    std::shared_ptr<BlockCompressor> GetCompressor() override {
+        return std::make_shared<Lz4BlockCompressor>();
+    }
+
+    std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+        return std::make_shared<Lz4BlockDecompressor>();
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_compressor.h 
b/src/paimon/common/compression/lz4/lz4_block_compressor.h
new file mode 100644
index 0000000..f8d941c
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_compressor.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <lz4.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+/// Encode data into LZ4 format (not compatible with the LZ4 Frame format).
+class Lz4BlockCompressor : public BlockCompressor {
+ public:
+    int32_t GetMaxCompressedSize(int32_t src_size) override {
+        return BlockCompressor::HEADER_LENGTH + LZ4_compressBound(src_size);
+    }
+
+    Result<int32_t> Compress(const char* src, int32_t src_length, char* dst,
+                             int32_t dst_length) override {
+        if (dst_length < BlockCompressor::HEADER_LENGTH) {
+            return Status::Invalid(fmt::format(
+                "Output buffer too small for LZ4 block header, expected at 
least {} bytes, got {}",
+                BlockCompressor::HEADER_LENGTH, dst_length));
+        }
+        int32_t compressed_size =
+            LZ4_compress_default(src, dst + BlockCompressor::HEADER_LENGTH, 
src_length,
+                                 dst_length - BlockCompressor::HEADER_LENGTH);
+        if (compressed_size <= 0) {
+            return Status::Invalid(fmt::format("Compression failed with code 
{}", compressed_size));
+        }
+        WriteIntLE(compressed_size, dst);
+        WriteIntLE(src_length, dst + 4);
+        return BlockCompressor::HEADER_LENGTH + compressed_size;
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/lz4/lz4_block_decompressor.h 
b/src/paimon/common/compression/lz4/lz4_block_decompressor.h
new file mode 100644
index 0000000..edbb2a8
--- /dev/null
+++ b/src/paimon/common/compression/lz4/lz4_block_decompressor.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <lz4.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_decompressor.h"
+
+namespace paimon {
+
+/// Decode data written with {@link Lz4BlockCompressor}.
+class Lz4BlockDecompressor : public BlockDecompressor {
+ public:
+    Result<int32_t> Decompress(const char* src, int32_t src_length, char* dst,
+                               int32_t dst_length) override {
+        if (src_length < HEADER_LENGTH) {
+            return Status::Invalid(fmt::format(
+                "Source data too short for LZ4 block header, expected at least 
{} bytes, got {}",
+                HEADER_LENGTH, src_length));
+        }
+        auto compressed_len = ReadIntLE(src);
+        auto original_len = ReadIntLE(src + 4);
+        PAIMON_RETURN_NOT_OK(ValidateLength(compressed_len, original_len));
+
+        if (dst_length < original_len) {
+            return Status::Invalid(
+                fmt::format("Buffer length too small, compressed_len= {}, 
original_len={}",
+                            compressed_len, original_len));
+        }
+
+        if (src_length - HEADER_LENGTH < compressed_len) {
+            return Status::Invalid("Source data is not integral for 
decompression.");
+        }
+
+        int32_t decompressed_size =
+            LZ4_decompress_safe(src + HEADER_LENGTH, dst, src_length - 
HEADER_LENGTH, dst_length);
+        if (decompressed_size != original_len) {
+            return Status::Invalid(fmt::format("Input is corrupted, expected 
{}, but got {}",
+                                               original_len, 
decompressed_size));
+        }
+        return decompressed_size;
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/none_block_compression_factory.h 
b/src/paimon/common/compression/none_block_compression_factory.h
new file mode 100644
index 0000000..19afbfa
--- /dev/null
+++ b/src/paimon/common/compression/none_block_compression_factory.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for none.
+class NoneBlockCompressionFactory : public BlockCompressionFactory {
+ public:
+    BlockCompressionType GetCompressionType() const override {
+        return BlockCompressionType::NONE;
+    }
+
+    std::shared_ptr<BlockCompressor> GetCompressor() override {
+        return nullptr;
+    }
+
+    std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+        return nullptr;
+    }
+};
+}  // namespace paimon
diff --git 
a/src/paimon/common/compression/zstd/zstd_block_compression_factory.h 
b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h
new file mode 100644
index 0000000..ab8c8ac
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_compression_factory.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "paimon/common/compression/block_compression_factory.h"
+#include "paimon/common/compression/zstd/zstd_block_compressor.h"
+#include "paimon/common/compression/zstd/zstd_block_decompressor.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// Implementation of {@link BlockCompressionFactory} for zstd codec.
+class ZstdBlockCompressionFactory : public BlockCompressionFactory {
+ public:
+    explicit ZstdBlockCompressionFactory(int32_t level = 1) : level_(level) {}
+
+ public:
+    BlockCompressionType GetCompressionType() const override {
+        return BlockCompressionType::ZSTD;
+    }
+
+    std::shared_ptr<BlockCompressor> GetCompressor() override {
+        return std::make_shared<ZstdBlockCompressor>(level_);
+    }
+
+    std::shared_ptr<BlockDecompressor> GetDecompressor() override {
+        return std::make_shared<ZstdBlockDecompressor>();
+    }
+
+ private:
+    int32_t level_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/zstd/zstd_block_compressor.h 
b/src/paimon/common/compression/zstd/zstd_block_compressor.h
new file mode 100644
index 0000000..3645309
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_compressor.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <zstd.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_compressor.h"
+
+namespace paimon {
+
+/// A {@link BlockCompressor} for zstd.
+class ZstdBlockCompressor : public BlockCompressor {
+ public:
+    explicit ZstdBlockCompressor(int32_t level) : level_(level) {}
+
+    int32_t GetMaxCompressedSize(int32_t src_size) override {
+        return ZSTD_compressBound(src_size);
+    }
+
+    Result<int32_t> Compress(const char* src, int32_t src_length, char* dst,
+                             int32_t dst_length) override {
+        size_t const compressed_size = ZSTD_compress(dst, dst_length, src, 
src_length, level_);
+        if (ZSTD_isError(compressed_size)) {
+            return Status::Invalid(fmt::format("Compression failed with code 
{}", compressed_size));
+        }
+        return compressed_size;
+    }
+
+ private:
+    int32_t level_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/compression/zstd/zstd_block_decompressor.h 
b/src/paimon/common/compression/zstd/zstd_block_decompressor.h
new file mode 100644
index 0000000..2494258
--- /dev/null
+++ b/src/paimon/common/compression/zstd/zstd_block_decompressor.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <zstd.h>
+
+#include "fmt/format.h"
+#include "paimon/common/compression/block_decompressor.h"
+
+namespace paimon {
+
+/// Decode data written with {@link ZstdBlockDecompressor}.
+class ZstdBlockDecompressor : public BlockDecompressor {
+ public:
+    Result<int32_t> Decompress(const char* src, int32_t src_length, char* dst,
+                               int32_t dst_length) override {
+        int32_t decompressed_size = ZSTD_decompress(dst, dst_length, src, 
src_length);
+        if (ZSTD_isError(decompressed_size)) {
+            return Status::Invalid(
+                fmt::format("Input is corrupted with return code {}", 
decompressed_size));
+        }
+        return decompressed_size;
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/format/parquet/predicate_pushdown_test.cpp 
b/src/paimon/format/parquet/predicate_pushdown_test.cpp
index 67d7cd5..7ccbaa2 100644
--- a/src/paimon/format/parquet/predicate_pushdown_test.cpp
+++ b/src/paimon/format/parquet/predicate_pushdown_test.cpp
@@ -345,7 +345,7 @@ TEST_F(PredicatePushdownTest, TestStringData) {
         CheckResult(read_schema, predicate, /*expected_array=*/expected_array);
     }
     {
-        // f0 like 'me', no data
+        // f0 like 'me', has data
         ASSERT_OK_AND_ASSIGN(const auto predicate,
                              PredicateBuilder::Like(
                                  /*field_index=*/0, /*field_name=*/"f0", 
FieldType::STRING,


Reply via email to