This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 80c3563  feat: add binary row serializers and blob implementation (#31)
80c3563 is described below

commit 80c3563461b1a3439abe5c75b88f579a2e2e949f
Author: lszskye <[email protected]>
AuthorDate: Mon Jun 1 14:09:42 2026 +0800

    feat: add binary row serializers and blob implementation (#31)
---
 src/paimon/common/data/binary_row_writer.cpp       |   1 +
 src/paimon/common/data/blob.cpp                    | 118 +++
 .../data/serializer/binary_row_serializer.cpp      |  56 ++
 .../common/data/serializer/binary_row_serializer.h |  69 ++
 .../data/serializer/binary_row_serializer_test.cpp |  73 ++
 .../data/serializer/binary_serializer_utils.cpp    | 218 ++++++
 .../data/serializer/binary_serializer_utils.h      |  55 ++
 .../serializer/binary_serializer_utils_test.cpp    | 206 ++++++
 .../data/serializer/row_compacted_serializer.cpp   | 637 ++++++++++++++++
 .../data/serializer/row_compacted_serializer.h     | 202 +++++
 .../serializer/row_compacted_serializer_test.cpp   | 821 +++++++++++++++++++++
 11 files changed, 2456 insertions(+)

diff --git a/src/paimon/common/data/binary_row_writer.cpp 
b/src/paimon/common/data/binary_row_writer.cpp
index 3c1deb4..c9b5ce0 100644
--- a/src/paimon/common/data/binary_row_writer.cpp
+++ b/src/paimon/common/data/binary_row_writer.cpp
@@ -161,6 +161,7 @@ Result<BinaryRowWriter::FieldSetterFunc> 
BinaryRowWriter::CreateFieldSetter(
                 }
                 auto decimal_value = 
DataDefine::GetVariantValue<Decimal>(field);
                 assert(decimal_value.Scale() == scale);
+                (void)scale;
                 return writer->WriteDecimal(field_idx, decimal_value, 
precision);
             };
             return field_setter;
diff --git a/src/paimon/common/data/blob.cpp b/src/paimon/common/data/blob.cpp
new file mode 100644
index 0000000..6695327
--- /dev/null
+++ b/src/paimon/common/data/blob.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/data/blob.h"
+
+#include <memory>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "paimon/common/data/blob_descriptor.h"
+#include "paimon/common/data/blob_utils.h"
+#include "paimon/common/io/offset_input_stream.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/stream_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class MemoryPool;
+
+class Blob::Impl {
+ public:
+    explicit Impl(std::unique_ptr<BlobDescriptor>&& descriptor)
+        : descriptor_(std::move(descriptor)) {}
+
+    PAIMON_UNIQUE_PTR<Bytes> SerializeDescriptor(const 
std::shared_ptr<MemoryPool>& pool) const {
+        return descriptor_->Serialize(pool);
+    }
+
+    const BlobDescriptor* GetDescriptor() const {
+        return descriptor_.get();
+    }
+
+    const std::string& Uri() const {
+        return descriptor_->Uri();
+    }
+
+ private:
+    std::unique_ptr<BlobDescriptor> descriptor_;
+};
+
+Result<std::unique_ptr<Blob>> Blob::FromPath(const std::string& path) {
+    return FromPath(path, /*offset=*/0, /*length=*/-1);
+}
+
+Result<std::unique_ptr<Blob>> Blob::FromPath(const std::string& path, int64_t 
offset,
+                                             int64_t length) {
+    PAIMON_ASSIGN_OR_RAISE(std::string normalized_path, 
PathUtil::NormalizePath(path));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlobDescriptor> descriptor,
+                           BlobDescriptor::Create(normalized_path, offset, 
length));
+    auto impl = std::make_unique<Impl>(std::move(descriptor));
+    return std::unique_ptr<Blob>(new Blob(std::move(impl)));
+}
+
+Blob::Blob(std::unique_ptr<Impl>&& impl) : impl_(std::move(impl)) {}
+Blob::~Blob() = default;
+
+Result<std::unique_ptr<Blob>> Blob::FromDescriptor(const char* buffer, 
uint64_t length) {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BlobDescriptor> descriptor,
+                           BlobDescriptor::Deserialize(buffer, length));
+
+    auto impl = std::make_unique<Impl>(std::move(descriptor));
+    return std::unique_ptr<Blob>(new Blob(std::move(impl)));
+}
+
+PAIMON_UNIQUE_PTR<Bytes> Blob::ToDescriptor(const std::shared_ptr<MemoryPool>& 
pool) const {
+    return impl_->SerializeDescriptor(pool);
+}
+
+const std::string& Blob::Uri() const {
+    return impl_->Uri();
+}
+
+Result<std::unique_ptr<InputStream>> Blob::NewInputStream(
+    const std::shared_ptr<FileSystem>& fs) const {
+    if (fs == nullptr) {
+        return Status::Invalid("file system is nullptr");
+    }
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> file,
+                           fs->Open(impl_->GetDescriptor()->Uri()));
+
+    return OffsetInputStream::Create(std::move(file), 
impl_->GetDescriptor()->Length(),
+                                     impl_->GetDescriptor()->Offset());
+}
+
+Result<PAIMON_UNIQUE_PTR<Bytes>> Blob::ToData(const 
std::shared_ptr<FileSystem>& fs,
+                                              const 
std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<InputStream> input_stream, 
NewInputStream(fs));
+    return StreamUtils::ReadAsyncFully(std::move(input_stream), pool);
+}
+
+Result<std::unique_ptr<ArrowSchema>> Blob::ArrowField(
+    const std::string& field_name, std::unordered_map<std::string, 
std::string> metadata) {
+    auto blob_field = BlobUtils::ToArrowField(field_name, /*nullable=*/false, 
metadata);
+    auto field = std::make_unique<::ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportField(*blob_field, 
field.get()));
+    return field;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer.cpp 
b/src/paimon/common/data/serializer/binary_row_serializer.cpp
new file mode 100644
index 0000000..826f755
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_row_serializer.h"
+
+#include <algorithm>
+#include <cassert>
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+
+Status BinaryRowSerializer::Serialize(const BinaryRow& record,
+                                      MemorySegmentOutputStream* target) const 
{
+    target->WriteValue<int32_t>(record.GetSizeInBytes());
+    return SerializeWithoutLength(record, target);
+}
+
+Status BinaryRowSerializer::SerializeWithoutLength(const BinaryRow& record,
+                                                   MemorySegmentOutputStream* 
target) const {
+    target->Write(record.GetSegment(), record.GetOffset(), 
record.GetSizeInBytes());
+    return Status::OK();
+}
+
+Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) 
const {
+    BinaryRow row(num_fields_);
+    PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
+    std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(read_length, 
pool_.get());
+    PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
+    row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
+    return row;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer.h 
b/src/paimon/common/data/serializer/binary_row_serializer.h
new file mode 100644
index 0000000..b1f0a05
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/binary_row.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class DataInputStream;
+class MemorySegmentOutputStream;
+class MemoryPool;
+
+/// Serializer for `BinaryRow`.
+class BinaryRowSerializer {
+ public:
+    BinaryRowSerializer(int32_t num_fields, const std::shared_ptr<MemoryPool>& 
pool)
+        : num_fields_(num_fields),
+          
fixed_length_part_size_(BinaryRow::CalculateFixPartSizeInBytes(num_fields)),
+          pool_(pool) {}
+
+    int32_t GetArity() const {
+        return num_fields_;
+    }
+
+    static constexpr int32_t LENGTH_SIZE_IN_BYTES = 4;
+
+    int32_t GetFixedLengthPartSize() const {
+        return fixed_length_part_size_;
+    }
+
+    /// @return Fixed part length to serialize one row.
+    int32_t GetSerializedRowFixedPartLength() const {
+        return GetFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
+    }
+
+    Result<BinaryRow> Deserialize(DataInputStream* source) const;
+
+    Status Serialize(const BinaryRow& row, MemorySegmentOutputStream* target) 
const;
+
+ private:
+    Status SerializeWithoutLength(const BinaryRow& record, 
MemorySegmentOutputStream* target) const;
+
+    int32_t num_fields_;
+    int32_t fixed_length_part_size_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_row_serializer_test.cpp 
b/src/paimon/common/data/serializer/binary_row_serializer_test.cpp
new file mode 100644
index 0000000..22c2f6b
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_row_serializer_test.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_row_serializer.h"
+
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class BinaryRowSerializerTest : public ::testing::Test {
+ protected:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        serializer_ = std::make_unique<BinaryRowSerializer>(3, pool_);
+    }
+
+    std::shared_ptr<MemoryPool> pool_;
+    std::unique_ptr<BinaryRowSerializer> serializer_;
+};
+
+TEST_F(BinaryRowSerializerTest, SerializeAndDeserialize) {
+    // Create a BinaryRow with some test data
+    BinaryRow row(3);
+    BinaryRowWriter writer(&row, 20, pool_.get());
+    writer.WriteInt(0, 42);
+    writer.WriteString(1, BinaryString::FromString("test", pool_.get()));
+    writer.WriteDouble(2, 3.14);
+
+    // Serialize the BinaryRow
+    MemorySegmentOutputStream output_stream(1, pool_);
+    ASSERT_OK(serializer_->Serialize(row, &output_stream));
+    auto segments = output_stream.Segments();
+    auto bytes =
+        MemorySegmentUtils::CopyToBytes(segments, 0, 
output_stream.CurrentSize(), pool_.get());
+
+    // Deserialize the BinaryRow
+    auto input_stream = std::make_shared<ByteArrayInputStream>(bytes->data(), 
bytes->size());
+    DataInputStream in(input_stream);
+    ASSERT_OK_AND_ASSIGN(BinaryRow deserialized_row, 
serializer_->Deserialize(&in));
+    EXPECT_EQ(deserialized_row.GetInt(0), 42);
+    EXPECT_EQ(deserialized_row.GetString(1).ToString(), "test");
+    EXPECT_EQ(deserialized_row.GetDouble(2), 3.14);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils.cpp 
b/src/paimon/common/data/serializer/binary_serializer_utils.cpp
new file mode 100644
index 0000000..4675e54
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils.cpp
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/date_time_utils.h"
+namespace paimon {
+Result<std::shared_ptr<BinaryArray>> BinarySerializerUtils::WriteBinaryArray(
+    const std::shared_ptr<InternalArray>& value, const 
std::shared_ptr<arrow::DataType>& type,
+    MemoryPool* pool) {
+    if (auto binary_array = std::dynamic_pointer_cast<BinaryArray>(value)) {
+        return binary_array;
+    }
+    auto binary_array = std::make_shared<BinaryArray>();
+    auto list_type = std::dynamic_pointer_cast<arrow::ListType>(type);
+    assert(list_type);
+    auto value_type = list_type->value_type();
+    // TODO(xinyu.lxy): reuse BinaryWriter
+    BinaryArrayWriter binary_writer(binary_array.get(), value->Size(),
+                                    
BinaryArrayWriter::GetElementSize(value_type->id()), pool);
+    for (int32_t i = 0; i < value->Size(); i++) {
+        PAIMON_RETURN_NOT_OK(WriteBinaryData(value_type, value.get(), i, 
&binary_writer, pool));
+    }
+    binary_writer.Complete();
+    return binary_array;
+}
+
+Result<std::shared_ptr<BinaryMap>> BinarySerializerUtils::WriteBinaryMap(
+    const std::shared_ptr<InternalMap>& value, const 
std::shared_ptr<arrow::DataType>& type,
+    MemoryPool* pool) {
+    if (auto binary_map = std::dynamic_pointer_cast<BinaryMap>(value)) {
+        return binary_map;
+    }
+    auto map_type = std::dynamic_pointer_cast<arrow::MapType>(type);
+    assert(map_type);
+    auto key_type = map_type->key_type();
+    auto value_type = map_type->item_type();
+    auto key_array = value->KeyArray();
+    auto value_array = value->ValueArray();
+    assert(key_array && value_array);
+    assert(key_array->Size() == value_array->Size());
+    BinaryArray binary_key_array;
+    BinaryArrayWriter binary_key_writer(&binary_key_array, key_array->Size(),
+                                        
BinaryArrayWriter::GetElementSize(key_type->id()), pool);
+    BinaryArray binary_value_array;
+    BinaryArrayWriter binary_value_writer(&binary_value_array, 
value_array->Size(),
+                                          
BinaryArrayWriter::GetElementSize(value_type->id()),
+                                          pool);
+    for (int32_t i = 0; i < key_array->Size(); i++) {
+        PAIMON_RETURN_NOT_OK(
+            WriteBinaryData(key_type, key_array.get(), i, &binary_key_writer, 
pool));
+        PAIMON_RETURN_NOT_OK(
+            WriteBinaryData(value_type, value_array.get(), i, 
&binary_value_writer, pool));
+    }
+    binary_key_writer.Complete();
+    binary_value_writer.Complete();
+    return BinaryMap::ValueOf(binary_key_array, binary_value_array, pool);
+}
+
+Result<std::shared_ptr<BinaryRow>> BinarySerializerUtils::WriteBinaryRow(
+    const std::shared_ptr<InternalRow>& value, const 
std::shared_ptr<arrow::DataType>& type,
+    MemoryPool* pool) {
+    if (auto binary_row = std::dynamic_pointer_cast<BinaryRow>(value)) {
+        return binary_row;
+    }
+
+    auto struct_type = std::dynamic_pointer_cast<arrow::StructType>(type);
+    assert(struct_type);
+    auto field_count = struct_type->num_fields();
+    auto binary_row = std::make_shared<BinaryRow>(field_count);
+    BinaryRowWriter binary_writer(binary_row.get(), /*initial_size=*/1024, 
pool);
+    for (int32_t i = 0; i < field_count; i++) {
+        PAIMON_RETURN_NOT_OK(
+            WriteBinaryData(struct_type->field(i)->type(), value.get(), i, 
&binary_writer, pool));
+    }
+    binary_writer.Complete();
+    return binary_row;
+}
+
+Status BinarySerializerUtils::WriteBinaryData(const 
std::shared_ptr<arrow::DataType>& type,
+                                              const DataGetters* getter, 
int32_t pos,
+                                              BinaryWriter* writer, 
MemoryPool* pool) {
+    assert(getter && writer && pool);
+    arrow::Type::type type_id = type->id();
+    auto array_writer = dynamic_cast<BinaryArrayWriter*>(writer);
+    if (getter->IsNullAt(pos)) {
+        // compatible with Java Paimon
+        if (array_writer) {
+            array_writer->SetNullAt(pos, type_id);
+            return Status::OK();
+        } else if (type_id != arrow::Type::type::DECIMAL &&
+                   type_id != arrow::Type::type::TIMESTAMP) {
+            // if row writer, exclude decimal and timestamp when set null
+            writer->SetNullAt(pos);
+            return Status::OK();
+        }
+    }
+    switch (type_id) {
+        case arrow::Type::type::BOOL: {
+            writer->WriteBoolean(pos, getter->GetBoolean(pos));
+            break;
+        }
+        case arrow::Type::type::INT8: {
+            writer->WriteByte(pos, getter->GetByte(pos));
+            break;
+        }
+        case arrow::Type::type::INT16: {
+            writer->WriteShort(pos, getter->GetShort(pos));
+            break;
+        }
+        case arrow::Type::type::DATE32: {
+            writer->WriteInt(pos, getter->GetDate(pos));
+            break;
+        }
+        case arrow::Type::type::INT32: {
+            writer->WriteInt(pos, getter->GetInt(pos));
+            break;
+        }
+        case arrow::Type::type::INT64: {
+            writer->WriteLong(pos, getter->GetLong(pos));
+            break;
+        }
+        case arrow::Type::type::FLOAT: {
+            writer->WriteFloat(pos, getter->GetFloat(pos));
+            break;
+        }
+        case arrow::Type::type::DOUBLE: {
+            writer->WriteDouble(pos, getter->GetDouble(pos));
+            break;
+        }
+        case arrow::Type::type::STRING: {
+            writer->WriteStringView(pos, getter->GetStringView(pos));
+            break;
+        }
+        case arrow::Type::type::BINARY: {
+            writer->WriteStringView(pos, getter->GetStringView(pos));
+            break;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto timestamp_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(type);
+            assert(timestamp_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            if (getter->IsNullAt(pos)) {
+                // compatible with Java Paimon
+                if (!Timestamp::IsCompact(precision)) {
+                    writer->WriteTimestamp(pos, std::nullopt, precision);
+                } else {
+                    writer->SetNullAt(pos);
+                }
+            } else {
+                writer->WriteTimestamp(pos, getter->GetTimestamp(pos, 
precision), precision);
+            }
+            break;
+        }
+        case arrow::Type::type::DECIMAL: {
+            auto* decimal_type = 
arrow::internal::checked_cast<arrow::Decimal128Type*>(type.get());
+            assert(decimal_type);
+            auto precision = decimal_type->precision();
+            auto scale = decimal_type->scale();
+            if (getter->IsNullAt(pos)) {
+                // compatible with Java Paimon
+                if (!Decimal::IsCompact(precision)) {
+                    writer->WriteDecimal(pos, std::nullopt, precision);
+                } else {
+                    writer->SetNullAt(pos);
+                }
+            } else {
+                writer->WriteDecimal(pos, getter->GetDecimal(pos, precision, 
scale), precision);
+            }
+            break;
+        }
+        case arrow::Type::type::LIST: {
+            auto internal_array = getter->GetArray(pos);
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryArray> binary_array,
+                                   WriteBinaryArray(internal_array, type, 
pool));
+            writer->WriteArray(pos, *binary_array);
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            auto internal_map = getter->GetMap(pos);
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryMap> binary_map,
+                                   WriteBinaryMap(internal_map, type, pool));
+            writer->WriteMap(pos, *binary_map);
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            auto internal_row = getter->GetRow(pos, type->num_fields());
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryRow> binary_row,
+                                   WriteBinaryRow(internal_row, type, pool));
+            writer->WriteRow(pos, *binary_row);
+            break;
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("type {} not support in WriteBinaryData in binary 
serializer utils",
+                            type->ToString()));
+    }
+    return Status::OK();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils.h 
b/src/paimon/common/data/serializer/binary_serializer_utils.h
new file mode 100644
index 0000000..f024b94
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_writer.h"
+
+namespace paimon {
+/// Utils for generate `BinaryRow`, `BinaryArray`, `BinaryMap` from 
`InternalRow`, `InternalArray`,
+/// and `InternalMap`.
+class BinarySerializerUtils {
+ public:
+    BinarySerializerUtils() = delete;
+    ~BinarySerializerUtils() = delete;
+
+    static Result<std::shared_ptr<BinaryArray>> WriteBinaryArray(
+        const std::shared_ptr<InternalArray>& value, const 
std::shared_ptr<arrow::DataType>& type,
+        MemoryPool* pool);
+
+    static Result<std::shared_ptr<BinaryMap>> WriteBinaryMap(
+        const std::shared_ptr<InternalMap>& value, const 
std::shared_ptr<arrow::DataType>& type,
+        MemoryPool* pool);
+
+    static Result<std::shared_ptr<BinaryRow>> WriteBinaryRow(
+        const std::shared_ptr<InternalRow>& value, const 
std::shared_ptr<arrow::DataType>& type,
+        MemoryPool* pool);
+
+ private:
+    static Status WriteBinaryData(const std::shared_ptr<arrow::DataType>& type,
+                                  const DataGetters* getter, int32_t pos, 
BinaryWriter* writer,
+                                  MemoryPool* pool);
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp 
b/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp
new file mode 100644
index 0000000..f557d46
--- /dev/null
+++ b/src/paimon/common/data/serializer/binary_serializer_utils_test.cpp
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+
+#include <string>
+
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(BinarySerializerUtilsTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    std::shared_ptr<arrow::DataType> arrow_type =
+        arrow::struct_({arrow::field("f1", arrow::boolean()), 
arrow::field("f2", arrow::int8()),
+                        arrow::field("f3", arrow::int16()), arrow::field("f4", 
arrow::int32()),
+                        arrow::field("field_null", arrow::int32()),
+                        arrow::field("f5", arrow::int64()), arrow::field("f6", 
arrow::float32()),
+                        arrow::field("f7", arrow::float64()), 
arrow::field("f8", arrow::utf8()),
+                        arrow::field("f9", arrow::binary()), 
arrow::field("f10", arrow::date32()),
+                        arrow::field("f11", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                        arrow::field("f12", arrow::decimal128(5, 2))});
+
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+        [true, 0, 32767, 2147483647, null, 4294967295, 0.5, 1.141, "20250327", 
"banana", 2026, 5000001, "5.12"]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row = std::make_shared<ColumnarRow>(/*struct_array=*/nullptr,
+                                                      struct_array->fields(), 
pool, /*row_id=*/0);
+
+    ASSERT_OK_AND_ASSIGN(auto binary_row, 
BinarySerializerUtils::WriteBinaryRow(
+                                              columnar_row, arrow_type, 
pool.get()));
+
+    // check result
+    ASSERT_EQ(binary_row->GetFieldCount(), 13);
+    ASSERT_FALSE(binary_row->IsNullAt(0));
+
+    ASSERT_EQ(binary_row->GetBoolean(0), true);
+    ASSERT_EQ(binary_row->GetByte(1), 0);
+    ASSERT_EQ(binary_row->GetShort(2), 32767);
+    ASSERT_EQ(binary_row->GetInt(3), 2147483647);
+    ASSERT_TRUE(binary_row->IsNullAt(4));
+    ASSERT_EQ(binary_row->GetLong(5), 4294967295l);
+    ASSERT_EQ(binary_row->GetFloat(6), 0.5);
+    ASSERT_EQ(binary_row->GetDouble(7), 1.141);
+    ASSERT_EQ(binary_row->GetString(8), BinaryString::FromString("20250327", 
pool.get()));
+    auto f9_bytes = std::make_shared<Bytes>("banana", pool.get());
+    ASSERT_EQ(*binary_row->GetBinary(9), *f9_bytes);
+    ASSERT_EQ(binary_row->GetDate(10), 2026);
+    ASSERT_EQ(binary_row->GetTimestamp(11, 9),
+              Timestamp(/*millisecond=*/5, /*nano_of_millisecond*/ 1));
+    ASSERT_EQ(binary_row->GetDecimal(12, 5, 2),
+              Decimal(5, 2, DecimalUtils::StrToInt128("512").value()));
+}
+
+TEST(BinarySerializerUtilsTest, TestNestedType) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto key_field = arrow::field("key", arrow::int32());
+    auto value_field = arrow::field("value", arrow::int32());
+
+    auto inner_child1 = arrow::field(
+        "inner1", arrow::map(arrow::int32(), arrow::field("inner_list", 
arrow::list(value_field))));
+    auto inner_child2 = arrow::field(
+        "inner2", arrow::map(arrow::int32(),
+                             arrow::field("inner_map", 
arrow::map(arrow::int32(), value_field))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct", arrow::struct_({key_field, 
value_field}))));
+
+    auto arrow_type = arrow::struct_({inner_child1, inner_child2, 
inner_child3});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[100, [1, 2, 3, 4]], [101, [5, 6, 7]]],
+[[200, [[500, 1]]], [201, [[501, 2]]]],
+[[600, [100, 200]]]]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row =
+        std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                      /*row_id=*/0);
+
+    ASSERT_OK_AND_ASSIGN(auto binary_row, 
BinarySerializerUtils::WriteBinaryRow(
+                                              columnar_row, arrow_type, 
pool.get()));
+
+    // check result
+    ASSERT_EQ(binary_row->GetFieldCount(), 3);
+
+    // for inner_child1
+    ASSERT_EQ(binary_row->GetMap(0)->KeyArray()->ToIntArray().value(),
+              std::vector<int32_t>({100, 101}));
+    auto value1 = binary_row->GetMap(0)->ValueArray();
+    ASSERT_EQ(value1->Size(), 2);
+    ASSERT_EQ(value1->GetArray(0)->ToIntArray().value(), 
std::vector<int32_t>({1, 2, 3, 4}));
+    ASSERT_EQ(value1->GetArray(1)->ToIntArray().value(), 
std::vector<int32_t>({5, 6, 7}));
+
+    // for inner_child2
+    ASSERT_EQ(binary_row->GetMap(1)->KeyArray()->ToIntArray().value(),
+              std::vector<int32_t>({200, 201}));
+    auto value2 = binary_row->GetMap(1)->ValueArray();
+    ASSERT_EQ(value2->Size(), 2);
+    ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({500}));
+    ASSERT_EQ(value2->GetMap(0)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({1}));
+    ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({501}));
+    ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({2}));
+
+    // for inner_child3
+    ASSERT_EQ(binary_row->GetMap(2)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({600}));
+    auto value3 = binary_row->GetMap(2)->ValueArray();
+    ASSERT_EQ(value3->Size(), 1);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(1), 200);
+}
+
+TEST(BinarySerializerUtilsTest, TestNestedTypeWithNull) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto key_field = arrow::field("key", arrow::int32());
+    auto value_field = arrow::field("value", arrow::int32());
+
+    auto inner_child1 = arrow::field(
+        "inner1", arrow::map(arrow::int32(), arrow::field("inner_list", 
arrow::list(value_field))));
+    auto inner_child2 = arrow::field(
+        "inner2", arrow::map(arrow::int32(),
+                             arrow::field("inner_map", 
arrow::map(arrow::int32(), value_field))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct", arrow::struct_({key_field, 
value_field}))));
+
+    auto arrow_type = arrow::struct_({inner_child1, inner_child2, 
inner_child3});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[100, null], [101, [5, 6, null]]],
+[[200, [[500, null]]], [201, [[501, 2]]]],
+[[600, [100, null]]]]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row =
+        std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                      /*row_id=*/0);
+
+    ASSERT_OK_AND_ASSIGN(auto binary_row, 
BinarySerializerUtils::WriteBinaryRow(
+                                              columnar_row, arrow_type, 
pool.get()));
+
+    // check result
+    ASSERT_EQ(binary_row->GetFieldCount(), 3);
+
+    // for inner_child1
+    ASSERT_EQ(binary_row->GetMap(0)->KeyArray()->ToIntArray().value(),
+              std::vector<int32_t>({100, 101}));
+    auto value1 = binary_row->GetMap(0)->ValueArray();
+    ASSERT_EQ(value1->Size(), 2);
+    ASSERT_TRUE(value1->IsNullAt(0));
+    ASSERT_EQ(value1->GetArray(1)->GetInt(0), 5);
+    ASSERT_EQ(value1->GetArray(1)->GetInt(1), 6);
+    ASSERT_TRUE(value1->GetArray(1)->IsNullAt(2));
+
+    // for inner_child2
+    ASSERT_EQ(binary_row->GetMap(1)->KeyArray()->ToIntArray().value(),
+              std::vector<int32_t>({200, 201}));
+    auto value2 = binary_row->GetMap(1)->ValueArray();
+    ASSERT_EQ(value2->Size(), 2);
+    ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({500}));
+    ASSERT_TRUE(value2->GetMap(0)->ValueArray()->IsNullAt(0));
+    ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({501}));
+    ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({2}));
+
+    // for inner_child3
+    ASSERT_EQ(binary_row->GetMap(2)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({600}));
+    auto value3 = binary_row->GetMap(2)->ValueArray();
+    ASSERT_EQ(value3->Size(), 1);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+    ASSERT_TRUE(value3->GetRow(0, /*num_fields=*/2)->IsNullAt(1));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.cpp 
b/src/paimon/common/data/serializer/row_compacted_serializer.cpp
new file mode 100644
index 0000000..4bf247b
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer.cpp
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/serializer/binary_serializer_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/fields_comparator.h"
+
+namespace paimon {
+Result<std::unique_ptr<RowCompactedSerializer>> RowCompactedSerializer::Create(
+    const std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<MemoryPool>& pool) {
+    std::vector<InternalRow::FieldGetterFunc> getters(schema->num_fields());
+    std::vector<RowCompactedSerializer::FieldWriter> 
writers(schema->num_fields());
+    std::vector<RowCompactedSerializer::FieldReader> 
readers(schema->num_fields());
+    for (int32_t i = 0; i < schema->num_fields(); i++) {
+        auto field_type = schema->field(i)->type();
+        PAIMON_ASSIGN_OR_RAISE(getters[i],
+                               InternalRow::CreateFieldGetter(i, field_type, 
/*use_view=*/true));
+        PAIMON_ASSIGN_OR_RAISE(writers[i], CreateFieldWriter(field_type, 
pool));
+        PAIMON_ASSIGN_OR_RAISE(readers[i], CreateFieldReader(field_type, 
pool));
+    }
+    return std::unique_ptr<RowCompactedSerializer>(new RowCompactedSerializer(
+        schema, std::move(getters), std::move(writers), std::move(readers), 
pool));
+}
+
+Result<int32_t> RowCompactedSerializer::CompareField(const FieldInfo& 
field_info,
+                                                     RowReader* reader1, 
RowReader* reader2) {
+    auto type = field_info.type_id;
+    switch (type) {
+        case arrow::Type::type::BOOL: {
+            auto val1 = reader1->ReadValue<bool>();
+            auto val2 = reader2->ReadValue<bool>();
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::INT8: {
+            auto val1 = reader1->ReadValue<char>();
+            auto val2 = reader2->ReadValue<char>();
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::INT16: {
+            auto val1 = reader1->ReadValue<int16_t>();
+            auto val2 = reader2->ReadValue<int16_t>();
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::INT32:
+        case arrow::Type::type::DATE32: {
+            auto val1 = reader1->ReadValue<int32_t>();
+            auto val2 = reader2->ReadValue<int32_t>();
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::INT64: {
+            auto val1 = reader1->ReadValue<int64_t>();
+            auto val2 = reader2->ReadValue<int64_t>();
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::FLOAT: {
+            auto val1 = reader1->ReadValue<float>();
+            auto val2 = reader2->ReadValue<float>();
+            return FieldsComparator::CompareFloatingPoint(val1, val2);
+        }
+        case arrow::Type::type::DOUBLE: {
+            auto val1 = reader1->ReadValue<double>();
+            auto val2 = reader2->ReadValue<double>();
+            return FieldsComparator::CompareFloatingPoint(val1, val2);
+        }
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY: {
+            PAIMON_ASSIGN_OR_RAISE(std::string_view val1, 
reader1->ReadStringView());
+            PAIMON_ASSIGN_OR_RAISE(std::string_view val2, 
reader2->ReadStringView());
+            int32_t cmp = val1.compare(val2);
+            return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            PAIMON_ASSIGN_OR_RAISE(Timestamp val1, 
reader1->ReadTimestamp(field_info.precision));
+            PAIMON_ASSIGN_OR_RAISE(Timestamp val2, 
reader2->ReadTimestamp(field_info.precision));
+            return val1 == val2 ? 0 : (val1 < val2 ? -1 : 1);
+        }
+        case arrow::Type::type::DECIMAL: {
+            PAIMON_ASSIGN_OR_RAISE(Decimal val1,
+                                   reader1->ReadDecimal(field_info.precision, 
field_info.scale));
+            PAIMON_ASSIGN_OR_RAISE(Decimal val2,
+                                   reader2->ReadDecimal(field_info.precision, 
field_info.scale));
+            int32_t cmp = val1.CompareTo(val2);
+            return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+        }
+        default:
+            return Status::NotImplemented(
+                fmt::format("Do not support comparing type {} in CompareField",
+                            static_cast<int32_t>(field_info.type_id)));
+    }
+}
+
+Result<MemorySlice::SliceComparator> 
RowCompactedSerializer::CreateSliceComparator(
+    const std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<MemoryPool>& pool) {
+    int32_t bit_set_in_bytes = 
RowCompactedSerializer::CalculateBitSetInBytes(schema->num_fields());
+    auto row_reader1 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
+    auto row_reader2 = std::make_shared<RowReader>(bit_set_in_bytes, pool);
+
+    std::vector<FieldInfo> field_infos(schema->num_fields());
+    for (int32_t i = 0; i < schema->num_fields(); i++) {
+        auto field_type = schema->field(i)->type();
+        field_infos[i].type_id = field_type->id();
+        if (field_type->id() == arrow::Type::type::TIMESTAMP) {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+            assert(timestamp_type);
+            field_infos[i].precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+        } else if (field_type->id() == arrow::Type::type::DECIMAL) {
+            auto decimal_type =
+                
arrow::internal::checked_pointer_cast<arrow::Decimal128Type>(field_type);
+            assert(decimal_type);
+            field_infos[i].precision = decimal_type->precision();
+            field_infos[i].scale = decimal_type->scale();
+        }
+    }
+
+    auto comparator = [row_reader1, row_reader2, field_infos](
+                          const MemorySlice& slice1, const MemorySlice& 
slice2) -> Result<int32_t> {
+        row_reader1->PointTo(slice1.GetSegment(), slice1.Offset());
+        row_reader2->PointTo(slice2.GetSegment(), slice2.Offset());
+        for (int32_t i = 0; i < static_cast<int32_t>(field_infos.size()); i++) 
{
+            bool is_null1 = row_reader1->IsNullAt(i);
+            bool is_null2 = row_reader2->IsNullAt(i);
+            if (!is_null1 || !is_null2) {
+                if (is_null1) {
+                    return -1;
+                } else if (is_null2) {
+                    return 1;
+                } else {
+                    PAIMON_ASSIGN_OR_RAISE(
+                        int32_t comp,
+                        CompareField(field_infos[i], row_reader1.get(), 
row_reader2.get()));
+                    if (comp != 0) {
+                        return comp;
+                    }
+                }
+            }
+        }
+        return 0;
+    };
+    return std::function<Result<int32_t>(const MemorySlice&, const 
MemorySlice&)>(comparator);
+}
+
+Result<std::shared_ptr<Bytes>> RowCompactedSerializer::SerializeToBytes(const 
InternalRow& row) {
+    if (!row_writer_) {
+        row_writer_ = 
std::make_unique<RowWriter>(CalculateBitSetInBytes(getters_.size()), pool_);
+    }
+    row_writer_->Reset();
+    PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, row.GetRowKind());
+    row_writer_->WriteRowKind(*row_kind);
+    for (size_t i = 0; i < getters_.size(); i++) {
+        VariantType field = getters_[i](row);
+        PAIMON_RETURN_NOT_OK(writers_[i](i, field, row_writer_.get()));
+    }
+    return row_writer_->CopyBuffer();
+}
+
+Result<std::unique_ptr<InternalRow>> RowCompactedSerializer::Deserialize(
+    const std::shared_ptr<Bytes>& bytes) {
+    if (!row_reader_) {
+        row_reader_ = 
std::make_unique<RowReader>(CalculateBitSetInBytes(getters_.size()), pool_);
+    }
+    row_reader_->PointTo(bytes);
+    auto row = std::make_unique<GenericRow>(getters_.size());
+    PAIMON_ASSIGN_OR_RAISE(const RowKind* row_kind, 
row_reader_->ReadRowKind());
+    row->SetRowKind(row_kind);
+    for (size_t i = 0; i < readers_.size(); i++) {
+        PAIMON_ASSIGN_OR_RAISE(VariantType field, readers_[i](i, 
row_reader_.get()));
+        row->SetField(i, field);
+    }
+    row->AddDataHolder(bytes);
+    return row;
+}
+
+RowCompactedSerializer::RowCompactedSerializer(
+    const std::shared_ptr<arrow::Schema>& schema,
+    std::vector<InternalRow::FieldGetterFunc>&& getters,
+    std::vector<RowCompactedSerializer::FieldWriter>&& writers,
+    std::vector<RowCompactedSerializer::FieldReader>&& readers,
+    const std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool),
+      schema_(schema),
+      getters_(std::move(getters)),
+      writers_(std::move(writers)),
+      readers_(std::move(readers)) {}
+
+Result<RowCompactedSerializer::FieldReader> 
RowCompactedSerializer::CreateFieldReader(
+    const std::shared_ptr<arrow::DataType>& field_type, const 
std::shared_ptr<MemoryPool>& pool) {
+    arrow::Type::type type = field_type->id();
+    RowCompactedSerializer::FieldReader field_reader;
+    switch (type) {
+        case arrow::Type::type::BOOL: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<bool>());
+            };
+            break;
+        }
+        case arrow::Type::type::INT8: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<char>());
+            };
+            break;
+        }
+        case arrow::Type::type::INT16: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<int16_t>());
+            };
+            break;
+        }
+        case arrow::Type::type::INT32:
+        case arrow::Type::type::DATE32: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<int32_t>());
+            };
+            break;
+        }
+        case arrow::Type::type::INT64: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<int64_t>());
+            };
+            break;
+        }
+        case arrow::Type::type::FLOAT: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<float>());
+            };
+            break;
+        }
+        case arrow::Type::type::DOUBLE: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                return VariantType(reader->ReadValue<double>());
+            };
+            break;
+        }
+        case arrow::Type::type::STRING: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, 
reader->ReadStringView());
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::BINARY: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, 
reader->ReadBinary());
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            field_reader = [precision](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, 
reader->ReadTimestamp(precision));
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::DECIMAL: {
+            auto* decimal_type =
+                
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
+            assert(decimal_type);
+            auto precision = decimal_type->precision();
+            auto scale = decimal_type->scale();
+            field_reader = [precision, scale](int32_t pos,
+                                              RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, 
reader->ReadDecimal(precision, scale));
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::LIST: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadArray());
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            field_reader = [](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, reader->ReadMap());
+                return value;
+            };
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            auto* struct_type = 
arrow::internal::checked_cast<arrow::StructType*>(field_type.get());
+            assert(struct_type);
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<RowCompactedSerializer> serializer,
+                
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
+            field_reader = [serializer](int32_t pos, RowReader* reader) -> 
Result<VariantType> {
+                PAIMON_ASSIGN_OR_RAISE(VariantType value, 
reader->ReadRow(serializer));
+                return value;
+            };
+            break;
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("type {} not support in FieldReader in row 
compacted serializer",
+                            field_type->ToString()));
+    }
+
+    RowCompactedSerializer::FieldReader ret =
+        [field_reader](int32_t pos, RowReader* reader) -> Result<VariantType> {
+        if (reader->IsNullAt(pos)) {
+            return VariantType(NullType());
+        }
+        return field_reader(pos, reader);
+    };
+    return ret;
+}
+
+Result<RowCompactedSerializer::FieldWriter> 
RowCompactedSerializer::CreateFieldWriter(
+    const std::shared_ptr<arrow::DataType>& field_type, const 
std::shared_ptr<MemoryPool>& pool) {
+    arrow::Type::type type = field_type->id();
+    RowCompactedSerializer::FieldWriter field_writer;
+    switch (type) {
+        case arrow::Type::type::BOOL: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<bool>(DataDefine::GetVariantValue<bool>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::INT8: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<char>(DataDefine::GetVariantValue<char>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::INT16: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<int16_t>(DataDefine::GetVariantValue<int16_t>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::INT32:
+        case arrow::Type::type::DATE32: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<int32_t>(DataDefine::GetVariantValue<int32_t>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::INT64: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<int64_t>(DataDefine::GetVariantValue<int64_t>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::FLOAT: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<float>(DataDefine::GetVariantValue<float>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::DOUBLE: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                return 
writer->WriteValue<double>(DataDefine::GetVariantValue<double>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::STRING: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                const auto* view = 
DataDefine::GetVariantPtr<std::string_view>(field);
+                if (view) {
+                    return writer->WriteStringView(*view);
+                }
+                return 
writer->WriteString(DataDefine::GetVariantValue<BinaryString>(field));
+            };
+            break;
+        }
+        case arrow::Type::type::BINARY: {
+            field_writer = [](int32_t pos, const VariantType& field, 
RowWriter* writer) -> Status {
+                const auto* view = 
DataDefine::GetVariantPtr<std::string_view>(field);
+                if (view) {
+                    return writer->WriteStringView(*view);
+                }
+                return writer->WriteBinary(
+                    
DataDefine::GetVariantValue<std::shared_ptr<Bytes>>(field).get());
+            };
+            break;
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(field_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            field_writer = [precision](int32_t pos, const VariantType& field,
+                                       RowWriter* writer) -> Status {
+                return 
writer->WriteTimestamp(DataDefine::GetVariantValue<Timestamp>(field),
+                                              precision);
+            };
+            break;
+        }
+        case arrow::Type::type::DECIMAL: {
+            auto* decimal_type =
+                
arrow::internal::checked_cast<arrow::Decimal128Type*>(field_type.get());
+            assert(decimal_type);
+            auto precision = decimal_type->precision();
+            field_writer = [precision](int32_t pos, const VariantType& field,
+                                       RowWriter* writer) -> Status {
+                return 
writer->WriteDecimal(DataDefine::GetVariantValue<Decimal>(field), precision);
+            };
+            break;
+        }
+        case arrow::Type::type::LIST: {
+            field_writer = [field_type](int32_t pos, const VariantType& field,
+                                        RowWriter* writer) -> Status {
+                return writer->WriteArray(
+                    
DataDefine::GetVariantValue<std::shared_ptr<InternalArray>>(field), field_type);
+            };
+            break;
+        }
+        case arrow::Type::type::MAP: {
+            field_writer = [field_type](int32_t pos, const VariantType& field,
+                                        RowWriter* writer) -> Status {
+                return writer->WriteMap(
+                    
DataDefine::GetVariantValue<std::shared_ptr<InternalMap>>(field), field_type);
+            };
+            break;
+        }
+        case arrow::Type::type::STRUCT: {
+            auto struct_type = 
arrow::internal::checked_pointer_cast<arrow::StructType>(field_type);
+            assert(struct_type);
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<RowCompactedSerializer> serializer,
+                
RowCompactedSerializer::Create(arrow::schema(struct_type->fields()), pool));
+            field_writer = [serializer](int32_t pos, const VariantType& field,
+                                        RowWriter* writer) -> Status {
+                return writer->WriteRow(
+                    
DataDefine::GetVariantValue<std::shared_ptr<InternalRow>>(field), serializer);
+            };
+            break;
+        }
+        default:
+            return Status::Invalid(
+                fmt::format("type {} not support in FieldWriter in row 
compacted serializer",
+                            field_type->ToString()));
+    }
+
+    RowCompactedSerializer::FieldWriter ret = [field_writer](int32_t pos, 
const VariantType& field,
+                                                             RowWriter* 
writer) -> Status {
+        if (DataDefine::IsVariantNull(field)) {
+            writer->SetNullAt(pos);
+            return Status::OK();
+        }
+        return field_writer(pos, field, writer);
+    };
+    return ret;
+}
+
+RowCompactedSerializer::RowWriter::RowWriter(int32_t header_size_in_bytes,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : header_size_in_bytes_(header_size_in_bytes), pool_(pool) {
+    int32_t initial_capacity = std::max(64, header_size_in_bytes);
+    SetBuffer(std::make_shared<Bytes>(initial_capacity, pool_.get()));
+    position_ = header_size_in_bytes_;
+}
+
+Status RowCompactedSerializer::RowWriter::WriteDecimal(const Decimal& value, 
int32_t precision) {
+    if (Decimal::IsCompact(precision)) {
+        return WriteValue<int64_t>(value.ToUnscaledLong());
+    } else {
+        auto value_bytes = value.ToUnscaledBytes();
+        return WriteBinary(&value_bytes);
+    }
+}
+
+Status RowCompactedSerializer::RowWriter::WriteTimestamp(const Timestamp& 
value,
+                                                         int32_t precision) {
+    if (Timestamp::IsCompact(precision)) {
+        return WriteValue<int64_t>(value.GetMillisecond());
+    } else {
+        PAIMON_RETURN_NOT_OK(WriteValue<int64_t>(value.GetMillisecond()));
+        return WriteUnsignedInt(value.GetNanoOfMillisecond());
+    }
+}
+
+Status RowCompactedSerializer::RowWriter::WriteRow(
+    const std::shared_ptr<InternalRow>& value,
+    const std::shared_ptr<RowCompactedSerializer>& serializer) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, 
serializer->SerializeToBytes(*value));
+    return WriteBinary(bytes.get());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteArray(const 
std::shared_ptr<InternalArray>& value,
+                                                     const 
std::shared_ptr<arrow::DataType>& type) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryArray> binary_array,
+                           BinarySerializerUtils::WriteBinaryArray(value, 
type, pool_.get()));
+    return WriteSegment(binary_array->GetSegment(), binary_array->GetOffset(),
+                        binary_array->GetSizeInBytes());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteMap(const 
std::shared_ptr<InternalMap>& value,
+                                                   const 
std::shared_ptr<arrow::DataType>& type) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BinaryMap> binary_map,
+                           BinarySerializerUtils::WriteBinaryMap(value, type, 
pool_.get()));
+    return WriteSegment(binary_map->GetSegment(), binary_map->GetOffset(),
+                        binary_map->GetSizeInBytes());
+}
+
+std::shared_ptr<Bytes> RowCompactedSerializer::RowWriter::CopyBuffer() const {
+    return Bytes::CopyOf(*buffer_, position_, pool_.get());
+}
+
+Status RowCompactedSerializer::RowWriter::WriteUnsignedInt(int32_t value) {
+    EnsureCapacity(VarLengthIntUtils::kMaxVarIntSize);
+    PAIMON_ASSIGN_OR_RAISE(int32_t len,
+                           VarLengthIntUtils::EncodeInt(value, buffer_->data() 
+ position_));
+    position_ += len;
+    return Status::OK();
+}
+
+Status RowCompactedSerializer::RowWriter::WriteSegment(const MemorySegment& 
segment, int32_t off,
+                                                       int32_t len) {
+    PAIMON_RETURN_NOT_OK(WriteUnsignedInt(len));
+    EnsureCapacity(len);
+    MemorySegmentUtils::CopyToBytes({segment}, off, buffer_.get(), position_, 
len);
+    position_ += len;
+    return Status::OK();
+}
+
+void RowCompactedSerializer::RowWriter::EnsureCapacity(int32_t size) {
+    if (static_cast<int32_t>(buffer_->size()) - position_ < size) {
+        Grow(size);
+    }
+}
+
+void RowCompactedSerializer::RowWriter::Grow(int32_t min_capacity_add) {
+    auto current_len = static_cast<int32_t>(buffer_->size());
+    int32_t new_len = std::max(current_len * 2, current_len + 
min_capacity_add);
+
+    auto new_buffer = std::make_shared<Bytes>(new_len, pool_.get());
+    memcpy(new_buffer->data(), buffer_->data(), current_len);
+    SetBuffer(std::move(new_buffer));
+}
+
+void RowCompactedSerializer::RowWriter::SetBuffer(std::shared_ptr<Bytes> 
new_buffer) {
+    buffer_ = std::move(new_buffer);
+    segment_ = MemorySegment::Wrap(buffer_);
+}
+
+void RowCompactedSerializer::RowReader::PointTo(const std::shared_ptr<Bytes>& 
bytes) {
+    MemorySegment seg = MemorySegment::Wrap(bytes);
+    PointTo(seg, 0);
+}
+
+void RowCompactedSerializer::RowReader::PointTo(const MemorySegment& segment, 
int32_t offset) {
+    segment_ = segment;
+
+    offset_ = offset;
+    position_ = offset + header_size_in_bytes_;
+}
+
+Result<const RowKind*> RowCompactedSerializer::RowReader::ReadRowKind() const {
+    char b = segment_.Get(offset_);
+    return RowKind::FromByteValue(static_cast<int8_t>(b));
+}
+
+Result<std::string_view> RowCompactedSerializer::RowReader::ReadStringView() {
+    PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+    std::string_view str(segment_.Data() + position_, length);
+    position_ += length;
+    return str;
+}
+
+Result<std::shared_ptr<Bytes>> RowCompactedSerializer::RowReader::ReadBinary() 
{
+    PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+    auto bytes = std::make_shared<Bytes>(length, pool_.get());
+    segment_.Get(position_, bytes.get(), /*offset=*/0, length);
+    position_ += length;
+    return bytes;
+}
+
+Result<Decimal> RowCompactedSerializer::RowReader::ReadDecimal(int32_t 
precision, int32_t scale) {
+    if (Decimal::IsCompact(precision)) {
+        return Decimal::FromUnscaledLong(ReadValue<int64_t>(), precision, 
scale);
+    } else {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
+        return Decimal::FromUnscaledBytes(precision, scale, bytes.get());
+    }
+}
+
+Result<Timestamp> RowCompactedSerializer::RowReader::ReadTimestamp(int32_t 
precision) {
+    if (Timestamp::IsCompact(precision)) {
+        return Timestamp::FromEpochMillis(ReadValue<int64_t>());
+    }
+    auto milliseconds = ReadValue<int64_t>();
+    PAIMON_ASSIGN_OR_RAISE(int32_t nanos_of_millisecond, ReadUnsignedInt());
+    return Timestamp::FromEpochMillis(milliseconds, nanos_of_millisecond);
+}
+
+Result<std::shared_ptr<InternalArray>> 
RowCompactedSerializer::RowReader::ReadArray() {
+    auto value = std::make_shared<BinaryArray>();
+    PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+    value->PointTo(segment_, position_, length);
+    position_ += length;
+    return value;
+}
+
+Result<std::shared_ptr<InternalMap>> 
RowCompactedSerializer::RowReader::ReadMap() {
+    auto value = std::make_shared<BinaryMap>();
+    PAIMON_ASSIGN_OR_RAISE(int32_t length, ReadUnsignedInt());
+    value->PointTo(segment_, position_, length);
+    position_ += length;
+    return value;
+}
+
+Result<std::shared_ptr<InternalRow>> 
RowCompactedSerializer::RowReader::ReadRow(
+    const std::shared_ptr<RowCompactedSerializer>& serializer) {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> bytes, ReadBinary());
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InternalRow> result, 
serializer->Deserialize(bytes));
+    return result;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/serializer/row_compacted_serializer.h 
b/src/paimon/common/data/serializer/row_compacted_serializer.h
new file mode 100644
index 0000000..9acda1c
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer.h
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <functional>
+
+#include "arrow/api.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_writer.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+namespace paimon {
+
+class RowCompactedSerializer {
+ public:
+    static Result<std::unique_ptr<RowCompactedSerializer>> Create(
+        const std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<MemoryPool>& pool);
+
+    static int32_t CalculateBitSetInBytes(int32_t arity) {
+        return (arity + 7 + BinaryRow::HEADER_SIZE_IN_BITS) / 8;
+    }
+
+    Result<std::shared_ptr<Bytes>> SerializeToBytes(const InternalRow& row);
+
+    Result<std::unique_ptr<InternalRow>> Deserialize(const 
std::shared_ptr<Bytes>& bytes);
+
+    static Result<MemorySlice::SliceComparator> CreateSliceComparator(
+        const std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<MemoryPool>& pool);
+
+ private:
+    struct FieldInfo {
+        arrow::Type::type type_id;
+        int32_t precision = -1;
+        int32_t scale = -1;
+    };
+
+    class RowWriter {
+     public:
+        RowWriter(int32_t header_size_in_bytes, const 
std::shared_ptr<MemoryPool>& pool);
+
+        void Reset() {
+            position_ = header_size_in_bytes_;
+            std::memset(buffer_->data(), 0, header_size_in_bytes_);
+        }
+
+        void WriteRowKind(const RowKind& kind) {
+            (*buffer_)[0] = static_cast<char>(kind.ToByteValue());
+        }
+
+        void SetNullAt(int32_t pos) {
+            MemorySegmentUtils::BitSet(&segment_, 0, pos + 
BinaryRow::HEADER_SIZE_IN_BITS);
+        }
+
+        template <typename T>
+        Status WriteValue(const T& value) {
+            EnsureCapacity(sizeof(T));
+            segment_.PutValue<T>(position_, value);
+            position_ += sizeof(T);
+            return Status::OK();
+        }
+
+        Status WriteString(const BinaryString& value) {
+            return WriteSegment(value.GetSegment(), value.GetOffset(), 
value.GetSizeInBytes());
+        }
+
+        Status WriteStringView(const std::string_view& view) {
+            return WriteBinary(&view);
+        }
+
+        template <typename T>
+        Status WriteBinary(const T* bytes) {
+            PAIMON_RETURN_NOT_OK(WriteUnsignedInt(bytes->size()));
+            EnsureCapacity(bytes->size());
+            memcpy(buffer_->data() + position_, bytes->data(), bytes->size());
+            position_ += bytes->size();
+            return Status::OK();
+        }
+
+        Status WriteDecimal(const Decimal& value, int32_t precision);
+        Status WriteTimestamp(const Timestamp& value, int32_t precision);
+
+        Status WriteArray(const std::shared_ptr<InternalArray>& value,
+                          const std::shared_ptr<arrow::DataType>& type);
+        Status WriteMap(const std::shared_ptr<InternalMap>& value,
+                        const std::shared_ptr<arrow::DataType>& type);
+
+        Status WriteRow(const std::shared_ptr<InternalRow>& value,
+                        const std::shared_ptr<RowCompactedSerializer>& 
serializer);
+
+        std::shared_ptr<Bytes> CopyBuffer() const;
+
+     private:
+        Status WriteUnsignedInt(int32_t value);
+        Status WriteSegment(const MemorySegment& segment, int32_t off, int32_t 
len);
+        void EnsureCapacity(int32_t size);
+        void Grow(int32_t min_capacity_add);
+        void SetBuffer(std::shared_ptr<Bytes> new_buffer);
+
+     private:
+        const int32_t header_size_in_bytes_;
+        std::shared_ptr<MemoryPool> pool_;
+        std::shared_ptr<Bytes> buffer_;
+        MemorySegment segment_;
+        int32_t position_ = 0;
+    };
+
+    class RowReader {
+     public:
+        RowReader(int32_t header_size_in_bytes, const 
std::shared_ptr<MemoryPool>& pool)
+            : header_size_in_bytes_(header_size_in_bytes), pool_(pool) {}
+
+        void PointTo(const std::shared_ptr<Bytes>& bytes);
+
+        void PointTo(const MemorySegment& segment, int32_t offset);
+
+        Result<const RowKind*> ReadRowKind() const;
+
+        bool IsNullAt(int32_t pos) const {
+            return MemorySegmentUtils::BitGet(segment_, offset_,
+                                              pos + 
BinaryRow::HEADER_SIZE_IN_BITS);
+        }
+
+        template <typename T>
+        T ReadValue() {
+            T value = segment_.GetValue<T>(position_);
+            position_ += sizeof(T);
+            return value;
+        }
+
+        Result<std::string_view> ReadStringView();
+
+        Result<std::shared_ptr<Bytes>> ReadBinary();
+
+        Result<Decimal> ReadDecimal(int32_t precision, int32_t scale);
+
+        Result<Timestamp> ReadTimestamp(int32_t precision);
+
+        Result<std::shared_ptr<InternalArray>> ReadArray();
+        Result<std::shared_ptr<InternalMap>> ReadMap();
+        Result<std::shared_ptr<InternalRow>> ReadRow(
+            const std::shared_ptr<RowCompactedSerializer>& serializer);
+
+     private:
+        Result<int32_t> ReadUnsignedInt() {
+            return VarLengthIntUtils::DecodeInt(segment_.Data(), &position_);
+        }
+
+     private:
+        const int32_t header_size_in_bytes_;
+        std::shared_ptr<MemoryPool> pool_;
+        MemorySegment segment_;
+        int32_t offset_ = 0;
+        int32_t position_ = 0;
+    };
+
+    /// Read and compare a single field from two RowReaders.
+    static Result<int32_t> CompareField(const FieldInfo& field_info, 
RowReader* reader1,
+                                        RowReader* reader2);
+
+    using FieldWriter = std::function<Status(int32_t, const VariantType&, 
RowWriter*)>;
+    using FieldReader = std::function<Result<VariantType>(int32_t, 
RowReader*)>;
+
+ private:
+    RowCompactedSerializer(const std::shared_ptr<arrow::Schema>& schema,
+                           std::vector<InternalRow::FieldGetterFunc>&& getters,
+                           std::vector<FieldWriter>&& writers, 
std::vector<FieldReader>&& readers,
+                           const std::shared_ptr<MemoryPool>& pool);
+
+    static Result<FieldReader> CreateFieldReader(const 
std::shared_ptr<arrow::DataType>& field_type,
+                                                 const 
std::shared_ptr<MemoryPool>& pool);
+    static Result<FieldWriter> CreateFieldWriter(const 
std::shared_ptr<arrow::DataType>& field_type,
+                                                 const 
std::shared_ptr<MemoryPool>& pool);
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::vector<InternalRow::FieldGetterFunc> getters_;
+    std::vector<FieldWriter> writers_;
+    std::vector<FieldReader> readers_;
+    std::unique_ptr<RowWriter> row_writer_;
+    std::unique_ptr<RowReader> row_reader_;
+};
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp 
b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp
new file mode 100644
index 0000000..cf11108
--- /dev/null
+++ b/src/paimon/common/data/serializer/row_compacted_serializer_test.cpp
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/serializer/row_compacted_serializer.h"
+
+#include <string>
+
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+TEST(RowCompactedSerializerTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    std::shared_ptr<arrow::DataType> arrow_type = arrow::struct_({
+        arrow::field("f1", arrow::boolean()),
+        arrow::field("f2", arrow::int8()),
+        arrow::field("f3", arrow::int16()),
+        arrow::field("f4", arrow::int32()),
+        arrow::field("f5", arrow::int64()),
+        arrow::field("f6", arrow::float32()),
+        arrow::field("f7", arrow::float64()),
+        arrow::field("f8", arrow::utf8()),
+        arrow::field("f9", arrow::binary()),
+        arrow::field("f10", arrow::date32()),
+        arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f12", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("f13", arrow::decimal128(5, 2)),
+        arrow::field("f14", arrow::decimal128(30, 5)),
+    });
+
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+        [true, 0, 32767, 2147483647, 4294967295, 0.5, 1.141, "20250327", 
"banana", 2026, 1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+        [null, null, null, null, null, null, null, null, null, null, null, 
null, null, null]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+    {
+        auto columnar_row = std::make_shared<ColumnarRow>(
+            /*struct_array=*/nullptr, struct_array->fields(), pool, 
/*row_id=*/0);
+        columnar_row->SetRowKind(RowKind::UpdateAfter());
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto de_row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(*(de_row->GetRowKind().value()), *(RowKind::UpdateAfter()));
+        ASSERT_EQ(de_row->GetFieldCount(), 14);
+        ASSERT_FALSE(de_row->IsNullAt(0));
+
+        ASSERT_EQ(de_row->GetBoolean(0), true);
+        ASSERT_EQ(de_row->GetByte(1), 0);
+        ASSERT_EQ(de_row->GetShort(2), 32767);
+        ASSERT_EQ(de_row->GetInt(3), 2147483647);
+        ASSERT_EQ(de_row->GetLong(4), 4294967295l);
+        ASSERT_EQ(de_row->GetFloat(5), 0.5);
+        ASSERT_EQ(de_row->GetDouble(6), 1.141);
+        ASSERT_EQ(de_row->GetStringView(7), "20250327");
+        auto f9_bytes = std::make_shared<Bytes>("banana", pool.get());
+        ASSERT_EQ(*de_row->GetBinary(8), *f9_bytes);
+        ASSERT_EQ(de_row->GetDate(9), 2026);
+        ASSERT_EQ(de_row->GetTimestamp(10, 9),
+                  Timestamp(/*millisecond=*/1732603136054ll, 
/*nano_of_millisecond*/ 54));
+        ASSERT_EQ(de_row->GetTimestamp(11, 0),
+                  Timestamp(/*millisecond=*/11000ll, /*nano_of_millisecond*/ 
0));
+        ASSERT_EQ(de_row->GetDecimal(12, 5, 2),
+                  Decimal(5, 2, DecimalUtils::StrToInt128("5502").value()));
+        ASSERT_EQ(de_row->GetDecimal(13, 30, 5),
+                  Decimal(30, 5, 
DecimalUtils::StrToInt128("-12345678998765432145678").value()));
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {
+            2,   0,   0,  1,  0,  255, 127, 255, 255, 255, 127, 255, 255, 255, 
255, 0,  0,   0,
+            0,   0,   0,  0,  63, 168, 198, 75,  55,  137, 65,  242, 63,  8,   
50,  48, 50,  53,
+            48,  51,  50, 55, 6,  98,  97,  110, 97,  110, 97,  234, 7,   0,   
0,   54, 200, 49,
+            103, 147, 1,  0,  0,  54,  248, 42,  0,   0,   0,   0,   0,   0,   
126, 21, 0,   0,
+            0,   0,   0,  0,  10, 253, 98,  189, 73,  88,  213, 98,  56,  248, 
242};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+    {
+        // all null
+        auto columnar_row = std::make_shared<ColumnarRow>(
+            /*struct_array=*/nullptr, struct_array->fields(), pool, 
/*row_id=*/1);
+        columnar_row->SetRowKind(RowKind::UpdateAfter());
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto de_row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(*(de_row->GetRowKind().value()), *(RowKind::UpdateAfter()));
+        ASSERT_EQ(de_row->GetFieldCount(), 14);
+        ASSERT_TRUE(de_row->IsNullAt(0));
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {2, 255, 63};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+}
+
+TEST(RowCompactedSerializerTest, TestNestedType) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto key_field = arrow::field("key", arrow::int32());
+    auto value_field = arrow::field("value", arrow::int32());
+
+    auto inner_child1 = arrow::field(
+        "inner1", arrow::map(arrow::int32(), arrow::field("inner_list", 
arrow::list(value_field))));
+    auto inner_child2 = arrow::field(
+        "inner2", arrow::map(arrow::int32(),
+                             arrow::field("inner_map", 
arrow::map(arrow::int32(), value_field))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct", arrow::struct_({key_field, 
value_field}))));
+
+    auto arrow_type = arrow::struct_({inner_child1, inner_child2, 
inner_child3});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[100, [1, 2, 3, 4]], [101, [5, 6, 7]]],
+[[200, [[500, 1]]], [201, [[501, 2]]]],
+[[600, [100, 200]]]]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row =
+        std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                      /*row_id=*/0);
+
+    // serialize and deserialize
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+    ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+    ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+    // check result
+    ASSERT_EQ(row->GetFieldCount(), 3);
+
+    // for inner_child1
+    ASSERT_EQ(row->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({100, 101}));
+    auto value1 = row->GetMap(0)->ValueArray();
+    ASSERT_EQ(value1->Size(), 2);
+    ASSERT_EQ(value1->GetArray(0)->ToIntArray().value(), 
std::vector<int32_t>({1, 2, 3, 4}));
+    ASSERT_EQ(value1->GetArray(1)->ToIntArray().value(), 
std::vector<int32_t>({5, 6, 7}));
+
+    // for inner_child2
+    ASSERT_EQ(row->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({200, 201}));
+    auto value2 = row->GetMap(1)->ValueArray();
+    ASSERT_EQ(value2->Size(), 2);
+    ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({500}));
+    ASSERT_EQ(value2->GetMap(0)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({1}));
+    ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({501}));
+    ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({2}));
+
+    // for inner_child3
+    ASSERT_EQ(row->GetMap(2)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({600}));
+    auto value3 = row->GetMap(2)->ValueArray();
+    ASSERT_EQ(value3->Size(), 1);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(1), 200);
+
+    // test compatibility
+    std::vector<uint8_t> java_bytes = {
+        0,  0,   92,  16, 0,  0, 0, 2,   0,  0,   0, 0,  0, 0,  0,  100, 0,   
0,  0,  101, 0,   0,
+        0,  2,   0,   0,  0,  0, 0, 0,   0,  24,  0, 0,  0, 24, 0,  0,   0,   
24, 0,  0,   0,   48,
+        0,  0,   0,   4,  0,  0, 0, 0,   0,  0,   0, 1,  0, 0,  0,  2,   0,   
0,  0,  3,   0,   0,
+        0,  4,   0,   0,  0,  3, 0, 0,   0,  0,   0, 0,  0, 5,  0,  0,   0,   
6,  0,  0,   0,   7,
+        0,  0,   0,   0,  0,  0, 0, 124, 16, 0,   0, 0,  2, 0,  0,  0,   0,   
0,  0,  0,   200, 0,
+        0,  0,   201, 0,  0,  0, 2, 0,   0,  0,   0, 0,  0, 0,  36, 0,   0,   
0,  24, 0,   0,   0,
+        36, 0,   0,   0,  64, 0, 0, 0,   16, 0,   0, 0,  1, 0,  0,  0,   0,   
0,  0,  0,   244, 1,
+        0,  0,   0,   0,  0,  0, 1, 0,   0,  0,   0, 0,  0, 0,  1,  0,   0,   
0,  0,  0,   0,   0,
+        0,  0,   0,   0,  16, 0, 0, 0,   1,  0,   0, 0,  0, 0,  0,  0,   245, 
1,  0,  0,   0,   0,
+        0,  0,   1,   0,  0,  0, 0, 0,   0,  0,   2, 0,  0, 0,  0,  0,   0,   
0,  0,  0,   0,   0,
+        60, 16,  0,   0,  0,  1, 0, 0,   0,  0,   0, 0,  0, 88, 2,  0,   0,   
0,  0,  0,   0,   1,
+        0,  0,   0,   0,  0,  0, 0, 24,  0,  0,   0, 16, 0, 0,  0,  0,   0,   
0,  0,  0,   0,   0,
+        0,  100, 0,   0,  0,  0, 0, 0,   0,  200, 0, 0,  0, 0,  0,  0,   0};
+    std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+    ASSERT_EQ(java_bytes, cpp_bytes);
+}
+
+TEST(RowCompactedSerializerTest, TestNestedTypeWithNull) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto key_field = arrow::field("key", arrow::int32());
+    auto value_field = arrow::field("value", arrow::int32());
+
+    auto inner_child1 = arrow::field(
+        "inner1", arrow::map(arrow::int32(), arrow::field("inner_list", 
arrow::list(value_field))));
+    auto inner_child2 = arrow::field(
+        "inner2", arrow::map(arrow::int32(),
+                             arrow::field("inner_map", 
arrow::map(arrow::int32(), value_field))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct", arrow::struct_({key_field, 
value_field}))));
+
+    auto arrow_type = arrow::struct_({inner_child1, inner_child2, 
inner_child3});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[100, null], [101, [5, 6, null]]],
+[[200, [[500, null]]], [201, [[501, 2]]]],
+[[600, [100, null]]]]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row =
+        std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                      /*row_id=*/0);
+
+    // serialize and deserialize
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+    ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+    ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+    // check result
+    ASSERT_EQ(row->GetFieldCount(), 3);
+
+    // for inner_child1
+    ASSERT_EQ(row->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({100, 101}));
+    auto value1 = row->GetMap(0)->ValueArray();
+    ASSERT_EQ(value1->Size(), 2);
+    ASSERT_TRUE(value1->IsNullAt(0));
+    ASSERT_EQ(value1->GetArray(1)->GetInt(0), 5);
+    ASSERT_EQ(value1->GetArray(1)->GetInt(1), 6);
+    ASSERT_TRUE(value1->GetArray(1)->IsNullAt(2));
+
+    // for inner_child2
+    ASSERT_EQ(row->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({200, 201}));
+    auto value2 = row->GetMap(1)->ValueArray();
+    ASSERT_EQ(value2->Size(), 2);
+    ASSERT_EQ(value2->GetMap(0)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({500}));
+    ASSERT_TRUE(value2->GetMap(0)->ValueArray()->IsNullAt(0));
+    ASSERT_EQ(value2->GetMap(1)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({501}));
+    ASSERT_EQ(value2->GetMap(1)->ValueArray()->ToIntArray().value(), 
std::vector<int32_t>({2}));
+
+    // for inner_child3
+    ASSERT_EQ(row->GetMap(2)->KeyArray()->ToIntArray().value(), 
std::vector<int32_t>({600}));
+    auto value3 = row->GetMap(2)->ValueArray();
+    ASSERT_EQ(value3->Size(), 1);
+    ASSERT_EQ(value3->GetRow(0, /*num_fields=*/2)->GetInt(0), 100);
+    ASSERT_TRUE(value3->GetRow(0, /*num_fields=*/2)->IsNullAt(1));
+
+    // test compatibility
+    std::vector<uint8_t> java_bytes = {
+        0,   0, 68, 16, 0, 0,   0,  2, 0, 0,  0, 0,  0,  0, 0,   100, 0,  0,  
0,   101, 0,  0,
+        0,   2, 0,  0,  0, 1,   0,  0, 0, 0,  0, 0,  0,  0, 0,   0,   0,  24, 
0,   0,   0,  24,
+        0,   0, 0,  3,  0, 0,   0,  4, 0, 0,  0, 5,  0,  0, 0,   6,   0,  0,  
0,   0,   0,  0,
+        0,   0, 0,  0,  0, 124, 16, 0, 0, 0,  2, 0,  0,  0, 0,   0,   0,  0,  
200, 0,   0,  0,
+        201, 0, 0,  0,  2, 0,   0,  0, 0, 0,  0, 0,  36, 0, 0,   0,   24, 0,  
0,   0,   36, 0,
+        0,   0, 64, 0,  0, 0,   16, 0, 0, 0,  1, 0,  0,  0, 0,   0,   0,  0,  
244, 1,   0,  0,
+        0,   0, 0,  0,  1, 0,   0,  0, 1, 0,  0, 0,  0,  0, 0,   0,   0,  0,  
0,   0,   0,  0,
+        0,   0, 16, 0,  0, 0,   1,  0, 0, 0,  0, 0,  0,  0, 245, 1,   0,  0,  
0,   0,   0,  0,
+        1,   0, 0,  0,  0, 0,   0,  0, 2, 0,  0, 0,  0,  0, 0,   0,   0,  0,  
0,   0,   60, 16,
+        0,   0, 0,  1,  0, 0,   0,  0, 0, 0,  0, 88, 2,  0, 0,   0,   0,  0,  
0,   1,   0,  0,
+        0,   0, 0,  0,  0, 24,  0,  0, 0, 16, 0, 0,  0,  0, 2,   0,   0,  0,  
0,   0,   0,  100,
+        0,   0, 0,  0,  0, 0,   0,  0, 0, 0,  0, 0,  0,  0, 0};
+    std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+    ASSERT_EQ(java_bytes, cpp_bytes);
+}
+
+TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal) {
+    // test map/list with nested child types
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto inner_child1 = arrow::field(
+        "inner1",
+        arrow::map(
+            arrow::int32(),
+            arrow::field("inner_list", arrow::list(arrow::field("f0", 
arrow::decimal128(5, 2))))));
+    auto inner_child2 = arrow::field(
+        "inner2",
+        arrow::map(
+            arrow::int32(),
+            arrow::field("inner_list", arrow::list(arrow::field("f1", 
arrow::decimal128(30, 5))))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct",
+                                arrow::struct_({
+                                    arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                    arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::SECOND)),
+                                    arrow::field("f2", arrow::decimal128(5, 
2)),
+                                    arrow::field("f3", arrow::decimal128(30, 
5)),
+                                }))));
+
+    auto arrow_type = arrow::struct_({inner_child1, inner_child2, 
inner_child3});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[100, ["5.12", "6.12"]]],
+[[200, ["-123456789987654321.45678", "23456789987654321.45678"]]],
+[[300, [1732603136054000054, 11, "7.89", "3456789987654321.45678"]]]],
+[[[102, [null, "6.12"]]],
+[[202, ["-123456789987654321.45678", null]]],
+[[302, [null, null, null, null]]]]
+    ])")
+                                              .ValueOrDie();
+
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+
+    {
+        auto columnar_row =
+            std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                          /*row_id=*/0);
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(row->GetFieldCount(), 3);
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {
+            0,   0,   60,  16,  0,   0,  0,   1,   0,   0,  0,   0,   0,   0,  
0,   100, 0,   0,
+            0,   0,   0,   0,   0,   1,  0,   0,   0,   0,  0,   0,   0,   24, 
0,   0,   0,   16,
+            0,   0,   0,   2,   0,   0,  0,   0,   0,   0,  0,   0,   2,   0,  
0,   0,   0,   0,
+            0,   100, 2,   0,   0,   0,  0,   0,   0,   92, 16,  0,   0,   0,  
1,   0,   0,   0,
+            0,   0,   0,   0,   200, 0,  0,   0,   0,   0,  0,   0,   1,   0,  
0,   0,   0,   0,
+            0,   0,   56,  0,   0,   0,  16,  0,   0,   0,  2,   0,   0,   0,  
0,   0,   0,   0,
+            10,  0,   0,   0,   24,  0,  0,   0,   9,   0,  0,   0,   40,  0,  
0,   0,   253, 98,
+            189, 73,  88,  213, 98,  56, 248, 242, 0,   0,  0,   0,   0,   0,  
127, 40,  213, 221,
+            111, 235, 135, 7,   14,  0,  0,   0,   0,   0,  0,   0,   100, 16, 
0,   0,   0,   1,
+            0,   0,   0,   0,   0,   0,  0,   44,  1,   0,  0,   0,   0,   0,  
0,   1,   0,   0,
+            0,   0,   0,   0,   0,   64, 0,   0,   0,   16, 0,   0,   0,   0,  
0,   0,   0,   0,
+            0,   0,   0,   54,  0,   0,  0,   40,  0,   0,  0,   248, 42,  0,  
0,   0,   0,   0,
+            0,   21,  3,   0,   0,   0,  0,   0,   0,   9,  0,   0,   0,   48, 
0,   0,   0,   54,
+            200, 49,  103, 147, 1,   0,  0,   18,  189, 66, 129, 228, 46,  71, 
7,   14,  0,   0,
+            0,   0,   0,   0,   0};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+    {
+        auto columnar_row =
+            std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                          /*row_id=*/1);
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(row->GetFieldCount(), 3);
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {
+            0,   0,  60, 16,  0,  0,  0,   1,   0,  0, 0,  0, 0,  0,   0,   
102, 0, 0, 0, 0,   0,
+            0,   0,  1,  0,   0,  0,  0,   0,   0,  0, 24, 0, 0,  0,   16,  0, 
  0, 0, 2, 0,   0,
+            0,   1,  0,  0,   0,  0,  0,   0,   0,  0, 0,  0, 0,  100, 2,   0, 
  0, 0, 0, 0,   0,
+            76,  16, 0,  0,   0,  1,  0,   0,   0,  0, 0,  0, 0,  202, 0,   0, 
  0, 0, 0, 0,   0,
+            1,   0,  0,  0,   0,  0,  0,   0,   40, 0, 0,  0, 16, 0,   0,   0, 
  2, 0, 0, 0,   2,
+            0,   0,  0,  10,  0,  0,  0,   24,  0,  0, 0,  0, 0,  0,   0,   0, 
  0, 0, 0, 253, 98,
+            189, 73, 88, 213, 98, 56, 248, 242, 0,  0, 0,  0, 0,  0,   100, 
16,  0, 0, 0, 1,   0,
+            0,   0,  0,  0,   0,  0,  46,  1,   0,  0, 0,  0, 0,  0,   1,   0, 
  0, 0, 0, 0,   0,
+            0,   64, 0,  0,   0,  16, 0,   0,   0,  0, 15, 0, 0,  0,   0,   0, 
  0, 0, 0, 0,   0,
+            40,  0,  0,  0,   0,  0,  0,   0,   0,  0, 0,  0, 0,  0,   0,   0, 
  0, 0, 0, 0,   0,
+            0,   0,  0,  48,  0,  0,  0,   0,   0,  0, 0,  0, 0,  0,   0,   0, 
  0, 0, 0, 0,   0,
+            0,   0,  0,  0,   0,  0,  0,   0,   0,  0};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+}
+TEST(RowCompactedSerializerTest, TestNestedNullWithTimestampAndDecimal2) {
+    // test struct with nested child types
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto inner_child1 = arrow::field(
+        "inner1",
+        arrow::map(
+            arrow::int32(),
+            arrow::field("inner_list", arrow::list(arrow::field("f0", 
arrow::decimal128(5, 2))))));
+    auto inner_child2 = arrow::field(
+        "inner2",
+        arrow::map(
+            arrow::int32(),
+            arrow::field("inner_list", arrow::list(arrow::field("f1", 
arrow::decimal128(30, 5))))));
+    auto inner_child3 = arrow::field(
+        "inner3",
+        arrow::map(arrow::int32(),
+                   arrow::field("inner_struct",
+                                arrow::struct_({
+                                    arrow::field("f0", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+                                    arrow::field("f1", 
arrow::timestamp(arrow::TimeUnit::SECOND)),
+                                    arrow::field("f2", arrow::decimal128(5, 
2)),
+                                    arrow::field("f3", arrow::decimal128(30, 
5)),
+                                }))));
+    auto child1 =
+        arrow::field("child1", arrow::struct_({inner_child1, inner_child2, 
inner_child3}));
+    auto child2 = arrow::field("child2", arrow::utf8());
+    auto arrow_type = arrow::struct_({child1, child2});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[[[100, ["5.12", "6.12"]]],
+[[200, ["-123456789987654321.45678", "23456789987654321.45678"]]],
+[[300, [1732603136054000054, 11, "7.89", "3456789987654321.45678"]]]],
+"Alice"],
+[[[[102, [null, "6.12"]]],
+[[202, ["-123456789987654321.45678", null]]],
+[[302, [null, null, null, null]]]],
+"Bob"]
+    ])")
+                                              .ValueOrDie();
+
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+
+    {
+        auto columnar_row =
+            std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                          /*row_id=*/0);
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(row->GetFieldCount(), 2);
+        auto inner_row = row->GetRow(0, 3);
+        ASSERT_EQ(inner_row->GetFieldCount(), 3);
+        ASSERT_EQ(row->GetStringView(1), "Alice");
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {
+            0,   0,   129, 2,   0,   0,   60,  16,  0,   0,  0,   1,   0,   0, 
 0,   0,   0,   0,
+            0,   100, 0,   0,   0,   0,   0,   0,   0,   1,  0,   0,   0,   0, 
 0,   0,   0,   24,
+            0,   0,   0,   16,  0,   0,   0,   2,   0,   0,  0,   0,   0,   0, 
 0,   0,   2,   0,
+            0,   0,   0,   0,   0,   100, 2,   0,   0,   0,  0,   0,   0,   
92, 16,  0,   0,   0,
+            1,   0,   0,   0,   0,   0,   0,   0,   200, 0,  0,   0,   0,   0, 
 0,   0,   1,   0,
+            0,   0,   0,   0,   0,   0,   56,  0,   0,   0,  16,  0,   0,   0, 
 2,   0,   0,   0,
+            0,   0,   0,   0,   10,  0,   0,   0,   24,  0,  0,   0,   9,   0, 
 0,   0,   40,  0,
+            0,   0,   253, 98,  189, 73,  88,  213, 98,  56, 248, 242, 0,   0, 
 0,   0,   0,   0,
+            127, 40,  213, 221, 111, 235, 135, 7,   14,  0,  0,   0,   0,   0, 
 0,   0,   100, 16,
+            0,   0,   0,   1,   0,   0,   0,   0,   0,   0,  0,   44,  1,   0, 
 0,   0,   0,   0,
+            0,   1,   0,   0,   0,   0,   0,   0,   0,   64, 0,   0,   0,   
16, 0,   0,   0,   0,
+            0,   0,   0,   0,   0,   0,   0,   54,  0,   0,  0,   40,  0,   0, 
 0,   248, 42,  0,
+            0,   0,   0,   0,   0,   21,  3,   0,   0,   0,  0,   0,   0,   9, 
 0,   0,   0,   48,
+            0,   0,   0,   54,  200, 49,  103, 147, 1,   0,  0,   18,  189, 
66, 129, 228, 46,  71,
+            7,   14,  0,   0,   0,   0,   0,   0,   0,   5,  65,  108, 105, 
99, 101};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+    {
+        auto columnar_row =
+            std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                          /*row_id=*/1);
+
+        // serialize and deserialize
+        ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+        ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+        // check result
+        ASSERT_EQ(row->GetFieldCount(), 2);
+        auto inner_row = row->GetRow(0, 3);
+        ASSERT_EQ(inner_row->GetFieldCount(), 3);
+        ASSERT_EQ(row->GetStringView(1), "Bob");
+
+        // test compatibility
+        std::vector<uint8_t> java_bytes = {
+            0, 0, 241, 1,  0,   0,  60, 16,  0,  0,  0,   1,   0,  0, 0,  0,  
0,   0,   0,   102, 0,
+            0, 0, 0,   0,  0,   0,  1,  0,   0,  0,  0,   0,   0,  0, 24, 0,  
0,   0,   16,  0,   0,
+            0, 2, 0,   0,  0,   1,  0,  0,   0,  0,  0,   0,   0,  0, 0,  0,  
0,   100, 2,   0,   0,
+            0, 0, 0,   0,  76,  16, 0,  0,   0,  1,  0,   0,   0,  0, 0,  0,  
0,   202, 0,   0,   0,
+            0, 0, 0,   0,  1,   0,  0,  0,   0,  0,  0,   0,   40, 0, 0,  0,  
16,  0,   0,   0,   2,
+            0, 0, 0,   2,  0,   0,  0,  10,  0,  0,  0,   24,  0,  0, 0,  0,  
0,   0,   0,   0,   0,
+            0, 0, 253, 98, 189, 73, 88, 213, 98, 56, 248, 242, 0,  0, 0,  0,  
0,   0,   100, 16,  0,
+            0, 0, 1,   0,  0,   0,  0,  0,   0,  0,  46,  1,   0,  0, 0,  0,  
0,   0,   1,   0,   0,
+            0, 0, 0,   0,  0,   64, 0,  0,   0,  16, 0,   0,   0,  0, 15, 0,  
0,   0,   0,   0,   0,
+            0, 0, 0,   0,  40,  0,  0,  0,   0,  0,  0,   0,   0,  0, 0,  0,  
0,   0,   0,   0,   0,
+            0, 0, 0,   0,  0,   0,  0,  48,  0,  0,  0,   0,   0,  0, 0,  0,  
0,   0,   0,   0,   0,
+            0, 0, 0,   0,  0,   0,  0,  0,   0,  0,  0,   0,   0,  0, 3,  66, 
111, 98};
+        std::vector<uint8_t> cpp_bytes(bytes->data(), bytes->data() + 
bytes->size());
+        ASSERT_EQ(java_bytes, cpp_bytes);
+    }
+}
+
+TEST(RowCompactedSerializerTest, TestListType) {
+    auto pool = GetDefaultPool();
+    // prepare data
+    auto inner_child1 = arrow::field("inner1", arrow::list(arrow::int32()));
+    auto arrow_type = arrow::struct_({inner_child1});
+    // each inner child per row
+    std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow_type,
+                                                                               
     R"([
+[[5, 6, 7]],
+[[1, 2, 3]],
+[[4]]
+    ])")
+                                              .ValueOrDie();
+    auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(array);
+    ASSERT_TRUE(struct_array);
+    auto columnar_row =
+        std::make_shared<ColumnarRow>(/*struct_array=*/nullptr, 
struct_array->fields(), pool,
+                                      /*row_id=*/0);
+
+    // serialize and deserialize
+    ASSERT_OK_AND_ASSIGN(auto serializer,
+                         
RowCompactedSerializer::Create(arrow::schema(arrow_type->fields()), pool));
+    ASSERT_OK_AND_ASSIGN(auto bytes, 
serializer->SerializeToBytes(*columnar_row));
+    ASSERT_OK_AND_ASSIGN(auto row, serializer->Deserialize(bytes));
+
+    // check result
+    ASSERT_EQ(row->GetFieldCount(), 1);
+
+    // for inner_child1
+    ASSERT_EQ(row->GetArray(0)->ToIntArray().value(), std::vector<int32_t>({5, 
6, 7}));
+}
+
+TEST(RowCompactedSerializerTest, TestSliceComparator) {
+    auto pool = GetDefaultPool();
+    arrow::FieldVector fields = {
+        arrow::field("f1", arrow::boolean()),
+        arrow::field("f2", arrow::int8()),
+        arrow::field("f3", arrow::int16()),
+        arrow::field("f4", arrow::int32()),
+        arrow::field("f5", arrow::int64()),
+        arrow::field("f6", arrow::float32()),
+        arrow::field("f7", arrow::float64()),
+        arrow::field("f8", arrow::utf8()),
+        arrow::field("f9", arrow::binary()),
+        arrow::field("f10", arrow::date32()),
+        arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f12", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("f13", arrow::decimal128(5, 2)),
+        arrow::field("f14", arrow::decimal128(30, 5)),
+    };
+    auto check_result = [&](const std::shared_ptr<arrow::DataType>& type,
+                            const std::shared_ptr<arrow::Array>& array) {
+        auto struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(array);
+        ASSERT_TRUE(struct_array);
+        ASSERT_OK_AND_ASSIGN(auto serializer,
+                             
RowCompactedSerializer::Create(arrow::schema(type->fields()), pool));
+        auto columnar_row1 = std::make_shared<ColumnarRow>(
+            /*struct_array=*/nullptr, struct_array->fields(), pool, 
/*row_id=*/0);
+        auto columnar_row2 = std::make_shared<ColumnarRow>(
+            /*struct_array=*/nullptr, struct_array->fields(), pool, 
/*row_id=*/1);
+
+        ASSERT_OK_AND_ASSIGN(auto bytes1, 
serializer->SerializeToBytes(*columnar_row1));
+        auto slice1 = MemorySlice::Wrap(bytes1);
+        ASSERT_OK_AND_ASSIGN(auto bytes2, 
serializer->SerializeToBytes(*columnar_row2));
+        auto slice2 = MemorySlice::Wrap(bytes2);
+
+        ASSERT_OK_AND_ASSIGN(auto comparator, 
RowCompactedSerializer::CreateSliceComparator(
+                                                  
arrow::schema(type->fields()), pool));
+        ASSERT_EQ(-1, comparator(slice1, slice2).value());
+        ASSERT_EQ(1, comparator(slice2, slice1).value());
+        ASSERT_EQ(0, comparator(slice1, slice1).value());
+        ASSERT_EQ(0, comparator(slice2, slice2).value());
+    };
+    // test various type
+    {
+        auto type = arrow::struct_({fields[0], fields[1], fields[2], 
fields[3]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [false, 0, 32767, 2147483647],
+        [true, 0, 32767, 2147483647]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1], fields[2], 
fields[3]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [true, 0, 32767, 2147483647],
+        [true, 1, 32767, 2147483647]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1], fields[2], 
fields[3]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [true, 0, 32766, 2147483647],
+        [true, 0, 32767, 2147483647]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1], fields[2], 
fields[3]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [true, 0, 32767, 2147483646],
+        [true, 0, 32767, 2147483647]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [4294967295, 10.1, 100.123],
+        [4294967296, 10.1, 100.123]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [4294967295, 10.1, 100.123],
+        [4294967295, 10.11, 100.123]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[4], fields[5], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [4294967295, 10.1, 100.123],
+        [4294967295, 10.1, 100.124]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        ["Alice", "这是一个中文", 10],
+        ["Bob", "这是一个中文", 10]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        ["Alice", "这是一个中文", 10],
+        ["Alice", "这是一个中文!", 10]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[7], fields[8], fields[9]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        ["Alice", "这是一个中文", 10],
+        ["Alice", "这是一个中文", 20]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[10], fields[11], fields[12], 
fields[13]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+        [1732603136054000055, 11, "55.02", "-123456789987654321.45678"]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[10], fields[11], fields[12], 
fields[13]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+        [1732603136054000054, 12, "55.02", "-123456789987654321.45678"]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[10], fields[11], fields[12], 
fields[13]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+        [1732603136054000054, 11, "55.03", "-123456789987654321.45678"]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[10], fields[11], fields[12], 
fields[13]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [1732603136054000054, 11, "55.02", "-123456789987654321.45678"],
+        [1732603136054000054, 11, "55.02", "-123456789987654321.45670"]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    // test null
+    {
+        auto type = arrow::struct_({fields[0], fields[1]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [false, null],
+        [false, 20]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [null, 20],
+        [null, 21]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [false, null],
+        [true, 20]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [null, 21],
+        [false, 20]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[0], fields[1]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [null, null],
+        [null, 20]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    // test float & double
+    // -infinity < -0.0 < +0.0 < +infinity < NaN == NaN
+    {
+        auto type = arrow::struct_({fields[5], fields[5], fields[5], 
fields[6], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [-Inf, -0.0, 0.0, Inf, NaN],
+        [-0.0, -0.0, 0.0, Inf, NaN]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[5], fields[5], fields[5], 
fields[6], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [-Inf, -0.0, 0.0, Inf, NaN],
+        [-Inf, 0.0, 0.0, Inf, NaN]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[5], fields[5], fields[5], 
fields[6], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [-Inf, -0.0, 0.0, 0.0, NaN],
+        [-Inf, -0.0, 0.0, Inf, NaN]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+    {
+        auto type = arrow::struct_({fields[5], fields[5], fields[5], 
fields[6], fields[6]});
+        std::shared_ptr<arrow::Array> array = 
arrow::ipc::internal::json::ArrayFromJSON(type,
+                                                                               
         R"([
+        [-Inf, -0.0, 0.0, Inf, Inf],
+        [-Inf, -0.0, 0.0, Inf, NaN]
+    ])")
+                                                  .ValueOrDie();
+        check_result(type, array);
+    }
+}
+
+}  // namespace paimon::test

Reply via email to