This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 69750b4  feat: add data evolution reader infrastructure (#64)
69750b4 is described below

commit 69750b41043a1194b3e78d1b00d3ed496c3a35aa
Author: lszskye <[email protected]>
AuthorDate: Tue Jun 9 01:25:01 2026 -0700

    feat: add data evolution reader infrastructure (#64)
---
 src/paimon/common/reader/data_evolution_array.h    | 167 ++++++
 .../common/reader/data_evolution_array_test.cpp    | 348 +++++++++++
 .../common/reader/data_evolution_file_reader.cpp   | 197 +++++++
 .../common/reader/data_evolution_file_reader.h     | 105 ++++
 .../reader/data_evolution_file_reader_test.cpp     | 635 +++++++++++++++++++++
 src/paimon/common/reader/data_evolution_row.h      | 151 +++++
 .../common/reader/data_evolution_row_test.cpp      | 242 ++++++++
 7 files changed, 1845 insertions(+)

diff --git a/src/paimon/common/reader/data_evolution_array.h 
b/src/paimon/common/reader/data_evolution_array.h
new file mode 100644
index 0000000..d311e63
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_array.h
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class InternalMap;
+class InternalRow;
+
+/// The array which is made up by several arrays.
+class DataEvolutionArray : public InternalArray {
+ public:
+    DataEvolutionArray(const std::vector<BinaryArray>& arrays,
+                       const std::vector<int32_t>& array_offsets,
+                       const std::vector<int32_t>& field_offsets)
+        : arrays_(arrays), array_offsets_(array_offsets), 
field_offsets_(field_offsets) {
+        assert(!arrays_.empty());
+    }
+
+    int32_t Size() const override {
+        return array_offsets_.size();
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        if (array_offsets_[pos] < 0) {
+            return true;
+        }
+        return arrays_[array_offsets_[pos]].IsNullAt(field_offsets_[pos]);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetBoolean(field_offsets_[pos]);
+    }
+
+    char GetByte(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetByte(field_offsets_[pos]);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetShort(field_offsets_[pos]);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetInt(field_offsets_[pos]);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetDate(field_offsets_[pos]);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetLong(field_offsets_[pos]);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetFloat(field_offsets_[pos]);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetDouble(field_offsets_[pos]);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetString(field_offsets_[pos]);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetStringView(field_offsets_[pos]);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override {
+        return arrays_[array_offsets_[pos]].GetDecimal(field_offsets_[pos], 
precision, scale);
+    }
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+        return arrays_[array_offsets_[pos]].GetTimestamp(field_offsets_[pos], 
precision);
+    }
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetBinary(field_offsets_[pos]);
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetArray(field_offsets_[pos]);
+    }
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+        return arrays_[array_offsets_[pos]].GetMap(field_offsets_[pos]);
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override {
+        return arrays_[array_offsets_[pos]].GetRow(field_offsets_[pos], 
num_fields);
+    }
+
+    Result<std::vector<char>> ToBooleanArray() const override {
+        return Status::Invalid("DataEvolutionArray not support 
ToBooleanArray");
+    }
+
+    Result<std::vector<char>> ToByteArray() const override {
+        return Status::Invalid("DataEvolutionArray not support ToByteArray");
+    }
+
+    Result<std::vector<int16_t>> ToShortArray() const override {
+        return Status::Invalid("DataEvolutionArray not support ToShortArray");
+    }
+
+    Result<std::vector<int32_t>> ToIntArray() const override {
+        return Status::Invalid("DataEvolutionArray not support ToIntArray");
+    }
+
+    Result<std::vector<int64_t>> ToLongArray() const override {
+        std::vector<int64_t> result;
+        result.reserve(Size());
+        for (int32_t i = 0; i < Size(); i++) {
+            assert(!IsNullAt(i));
+            result.push_back(GetLong(i));
+        }
+        return result;
+    }
+
+    Result<std::vector<float>> ToFloatArray() const override {
+        return Status::Invalid("DataEvolutionArray not support ToFloatArray");
+    }
+
+    Result<std::vector<double>> ToDoubleArray() const override {
+        return Status::Invalid("DataEvolutionArray not support ToDoubleArray");
+    }
+
+ private:
+    std::vector<BinaryArray> arrays_;
+    std::vector<int32_t> array_offsets_;
+    std::vector<int32_t> field_offsets_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_array_test.cpp 
b/src/paimon/common/reader/data_evolution_array_test.cpp
new file mode 100644
index 0000000..5ef2e17
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_array_test.cpp
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_array.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DataEvolutionArrayTest, TestSimple) {
+    auto pool = GetDefaultPool();
+
+    std::vector<int32_t> array_offsets = {0, 2, 0, 1, 2, 1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+    std::vector<int64_t> src_array1 = {1, -100};
+    auto array1 = BinaryRowGenerator::FromLongArrayWithNull(src_array1, 
pool.get());
+
+    std::vector<int64_t> src_array2 = {2, -2};
+    auto array2 = BinaryRowGenerator::FromLongArrayWithNull(src_array2, 
pool.get());
+
+    std::vector<int64_t> src_array3 = {3, -3};
+    auto array3 = BinaryRowGenerator::FromLongArrayWithNull(src_array3, 
pool.get());
+
+    DataEvolutionArray data_evolution_array(std::vector<BinaryArray>({array1, 
array2, array3}),
+                                            array_offsets, field_offsets);
+
+    ASSERT_FALSE(data_evolution_array.IsNullAt(0));
+
+    ASSERT_EQ(data_evolution_array.GetLong(0), 1);
+    ASSERT_EQ(data_evolution_array.GetLong(1), 3);
+    ASSERT_EQ(data_evolution_array.GetLong(2), -100);
+    ASSERT_EQ(data_evolution_array.GetLong(3), -2);
+    ASSERT_EQ(data_evolution_array.GetLong(4), -3);
+    ASSERT_EQ(data_evolution_array.GetLong(5), 2);
+
+    ASSERT_OK_AND_ASSIGN(auto ret, data_evolution_array.ToLongArray());
+    ASSERT_EQ(ret, std::vector<int64_t>({1, 3, -100, -2, -3, 2}));
+
+    ASSERT_NOK_WITH_MSG(data_evolution_array.ToBooleanArray(),
+                        "DataEvolutionArray not support ToBooleanArray");
+}
+
+TEST(DataEvolutionArrayTest, TestNullValue) {
+    auto pool = GetDefaultPool();
+
+    std::vector<int32_t> array_offsets = {-2, -1, 0, 0};
+    std::vector<int32_t> field_offsets = {-1, -1, 0, 1};
+
+    std::vector<int64_t> src_array1 = {1, -100};
+    auto array1 = BinaryRowGenerator::FromLongArrayWithNull(src_array1, 
pool.get());
+
+    DataEvolutionArray 
data_evolution_array(std::vector<BinaryArray>({array1}), array_offsets,
+                                            field_offsets);
+
+    ASSERT_TRUE(data_evolution_array.IsNullAt(0));
+    ASSERT_TRUE(data_evolution_array.IsNullAt(1));
+    ASSERT_EQ(data_evolution_array.GetLong(2), 1);
+    ASSERT_EQ(data_evolution_array.GetLong(3), -100);
+}
+
+TEST(DataEvolutionArrayTest, TestAllFieldTypes) {
+    auto pool = GetDefaultPool();
+
+    // We create separate BinaryArrays for each field type and use 
DataEvolutionArray
+    // to proxy access across them. Each array has 2 elements; the evolution 
array
+    // picks one element from each underlying array via 
array_offsets/field_offsets.
+
+    // --- Boolean array (array index 0) ---
+    BinaryArray bool_array;
+    {
+        BinaryArrayWriter writer(&bool_array, 2, sizeof(bool), pool.get());
+        writer.WriteBoolean(0, true);
+        writer.WriteBoolean(1, false);
+        writer.Complete();
+    }
+
+    // --- Byte array (array index 1) ---
+    BinaryArray byte_array;
+    {
+        BinaryArrayWriter writer(&byte_array, 2, sizeof(int8_t), pool.get());
+        writer.WriteByte(0, 42);
+        writer.WriteByte(1, -7);
+        writer.Complete();
+    }
+
+    // --- Short array (array index 2) ---
+    BinaryArray short_array;
+    {
+        BinaryArrayWriter writer(&short_array, 2, sizeof(int16_t), pool.get());
+        writer.WriteShort(0, 1024);
+        writer.WriteShort(1, -512);
+        writer.Complete();
+    }
+
+    // --- Int array (array index 3) ---
+    BinaryArray int_array;
+    {
+        BinaryArrayWriter writer(&int_array, 2, sizeof(int32_t), pool.get());
+        writer.WriteInt(0, 100000);
+        writer.WriteInt(1, -99999);
+        writer.Complete();
+    }
+
+    // --- Long array (array index 4) ---
+    BinaryArray long_array;
+    {
+        BinaryArrayWriter writer(&long_array, 2, sizeof(int64_t), pool.get());
+        writer.WriteLong(0, 1234567890LL);
+        writer.WriteLong(1, -987654321LL);
+        writer.Complete();
+    }
+
+    // --- Float array (array index 5) ---
+    BinaryArray float_array;
+    {
+        BinaryArrayWriter writer(&float_array, 2, sizeof(float), pool.get());
+        writer.WriteFloat(0, 3.14f);
+        writer.WriteFloat(1, -2.71f);
+        writer.Complete();
+    }
+
+    // --- Double array (array index 6) ---
+    BinaryArray double_array;
+    {
+        BinaryArrayWriter writer(&double_array, 2, sizeof(double), pool.get());
+        writer.WriteDouble(0, 1.23456789);
+        writer.WriteDouble(1, -9.87654321);
+        writer.Complete();
+    }
+
+    // --- String array (array index 7) ---
+    BinaryArray string_array;
+    {
+        BinaryArrayWriter writer(&string_array, 2, 8, pool.get());
+        writer.WriteString(0, BinaryString::FromString("hello", pool.get()));
+        writer.WriteString(1, BinaryString::FromString("world", pool.get()));
+        writer.Complete();
+    }
+
+    // --- Decimal array (array index 8), precision=10, scale=2 ---
+    BinaryArray decimal_array;
+    {
+        BinaryArrayWriter writer(&decimal_array, 2, 8, pool.get());
+        writer.WriteDecimal(0, Decimal(10, 2, 12345), 10);
+        writer.WriteDecimal(1, Decimal(10, 2, 67890), 10);
+        writer.Complete();
+    }
+
+    // --- Timestamp array (array index 9), precision=9 (non-compact) ---
+    BinaryArray timestamp_array;
+    {
+        BinaryArrayWriter writer(&timestamp_array, 2, 8, pool.get());
+        writer.WriteTimestamp(0, Timestamp(1000, 500), 9);
+        writer.WriteTimestamp(1, Timestamp(2000, 999), 9);
+        writer.Complete();
+    }
+
+    // --- Binary/Bytes array (array index 10) ---
+    BinaryArray binary_array;
+    {
+        Bytes bytes_val1("abcde", pool.get());
+        Bytes bytes_val2("fghij", pool.get());
+        BinaryArrayWriter writer(&binary_array, 2, 8, pool.get());
+        writer.WriteBinary(0, bytes_val1);
+        writer.WriteBinary(1, bytes_val2);
+        writer.Complete();
+    }
+
+    // --- Nested Array array (array index 11) ---
+    BinaryArray nested_array;
+    {
+        auto inner = BinaryArray::FromIntArray({10, 20}, pool.get());
+        auto inner2 = BinaryArray::FromIntArray({30, 40}, pool.get());
+        BinaryArrayWriter writer(&nested_array, 2, 8, pool.get());
+        writer.WriteArray(0, inner);
+        writer.WriteArray(1, inner2);
+        writer.Complete();
+    }
+
+    // --- Map array (array index 12) ---
+    BinaryArray map_array;
+    {
+        auto map_key = BinaryArray::FromIntArray({1, 2}, pool.get());
+        auto map_val = BinaryArray::FromLongArray({100LL, 200LL}, pool.get());
+        auto map_obj = BinaryMap::ValueOf(map_key, map_val, pool.get());
+        BinaryArrayWriter writer(&map_array, 1, 8, pool.get());
+        writer.WriteMap(0, *map_obj);
+        writer.Complete();
+    }
+
+    // --- Row array (array index 13) ---
+    BinaryArray row_array;
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {std::string("Alice"), static_cast<int32_t>(30)}, pool.get());
+        BinaryArrayWriter writer(&row_array, 1, 8, pool.get());
+        writer.WriteRow(0, row1);
+        writer.Complete();
+    }
+
+    // --- Date array (array index 14), stored as int32 ---
+    BinaryArray date_array;
+    {
+        BinaryArrayWriter writer(&date_array, 2, sizeof(int32_t), pool.get());
+        writer.WriteInt(0, 19000);  // days since epoch
+        writer.WriteInt(1, 18500);
+        writer.Complete();
+    }
+
+    std::vector<BinaryArray> arrays = {
+        bool_array,       // 0
+        byte_array,       // 1
+        short_array,      // 2
+        int_array,        // 3
+        long_array,       // 4
+        float_array,      // 5
+        double_array,     // 6
+        string_array,     // 7
+        decimal_array,    // 8
+        timestamp_array,  // 9
+        binary_array,     // 10
+        nested_array,     // 11
+        map_array,        // 12
+        row_array,        // 13
+        date_array,       // 14
+    };
+
+    // Each evolution entry: (array_index, field_index_within_array)
+    // pos 0:  boolean   -> arrays[0][0]  = true
+    // pos 1:  byte      -> arrays[1][0]  = 42
+    // pos 2:  short     -> arrays[2][1]  = -512
+    // pos 3:  int       -> arrays[3][0]  = 100000
+    // pos 4:  long      -> arrays[4][1]  = -987654321
+    // pos 5:  float     -> arrays[5][0]  = 3.14f
+    // pos 6:  double    -> arrays[6][1]  = -9.87654321
+    // pos 7:  string    -> arrays[7][0]  = "hello"
+    // pos 8:  decimal   -> arrays[8][1]  = Decimal(10,2,67890)
+    // pos 9:  timestamp -> arrays[9][0]  = Timestamp(1000,500)
+    // pos 10: binary    -> arrays[10][1] = "fghij"
+    // pos 11: array     -> arrays[11][0] = [10,20]
+    // pos 12: map       -> arrays[12][0]
+    // pos 13: row       -> arrays[13][0] = ("Alice", 30)
+    // pos 14: date      -> arrays[14][0] = 19000
+    // pos 15: string    -> arrays[7][1]  = "world"
+    std::vector<int32_t> array_offsets = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 12, 13, 14, 7};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 
0, 0, 0, 1};
+
+    DataEvolutionArray evolution(arrays, array_offsets, field_offsets);
+
+    ASSERT_EQ(evolution.Size(), 16);
+
+    // Boolean
+    ASSERT_FALSE(evolution.IsNullAt(0));
+    ASSERT_EQ(evolution.GetBoolean(0), true);
+
+    // Byte
+    ASSERT_EQ(evolution.GetByte(1), 42);
+
+    // Short
+    ASSERT_EQ(evolution.GetShort(2), -512);
+
+    // Int
+    ASSERT_EQ(evolution.GetInt(3), 100000);
+
+    // Long
+    ASSERT_EQ(evolution.GetLong(4), -987654321LL);
+
+    // Float
+    ASSERT_FLOAT_EQ(evolution.GetFloat(5), 3.14f);
+
+    // Double
+    ASSERT_DOUBLE_EQ(evolution.GetDouble(6), -9.87654321);
+
+    // String
+    ASSERT_EQ(evolution.GetString(7).ToString(), "hello");
+
+    // StringView
+    ASSERT_EQ(evolution.GetStringView(15), "world");
+
+    // Decimal
+    ASSERT_EQ(evolution.GetDecimal(8, 10, 2), Decimal(10, 2, 67890));
+
+    // Timestamp
+    ASSERT_EQ(evolution.GetTimestamp(9, 9), Timestamp(1000, 500));
+
+    // Binary
+    auto retrieved_binary = evolution.GetBinary(10);
+    Bytes expected_binary("fghij", pool.get());
+    ASSERT_EQ(*retrieved_binary, expected_binary);
+
+    // Nested Array
+    auto retrieved_array = evolution.GetArray(11);
+    ASSERT_OK_AND_ASSIGN(auto inner_values, retrieved_array->ToIntArray());
+    ASSERT_EQ(inner_values, std::vector<int32_t>({10, 20}));
+
+    // Map
+    auto retrieved_map = evolution.GetMap(12);
+    ASSERT_EQ(retrieved_map->Size(), 2);
+
+    // Row
+    auto retrieved_row = evolution.GetRow(13, 2);
+    ASSERT_EQ(retrieved_row->GetString(0).ToString(), "Alice");
+    ASSERT_EQ(retrieved_row->GetInt(1), 30);
+
+    // Date
+    ASSERT_EQ(evolution.GetDate(14), 19000);
+
+    // Verify unsupported ToXxxArray methods
+    ASSERT_NOK_WITH_MSG(evolution.ToBooleanArray(),
+                        "DataEvolutionArray not support ToBooleanArray");
+    ASSERT_NOK_WITH_MSG(evolution.ToByteArray(), "DataEvolutionArray not 
support ToByteArray");
+    ASSERT_NOK_WITH_MSG(evolution.ToShortArray(), "DataEvolutionArray not 
support ToShortArray");
+    ASSERT_NOK_WITH_MSG(evolution.ToIntArray(), "DataEvolutionArray not 
support ToIntArray");
+    ASSERT_NOK_WITH_MSG(evolution.ToFloatArray(), "DataEvolutionArray not 
support ToFloatArray");
+    ASSERT_NOK_WITH_MSG(evolution.ToDoubleArray(), "DataEvolutionArray not 
support ToDoubleArray");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/reader/data_evolution_file_reader.cpp 
b/src/paimon/common/reader/data_evolution_file_reader.cpp
new file mode 100644
index 0000000..8e8fbbc
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader.cpp
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_file_reader.h"
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "fmt/format.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+namespace paimon {
+Result<std::unique_ptr<DataEvolutionFileReader>> 
DataEvolutionFileReader::Create(
+    std::vector<std::unique_ptr<BatchReader>>&& readers,
+    const std::shared_ptr<arrow::Schema>& read_schema, int32_t read_batch_size,
+    const std::vector<int32_t>& reader_offsets, const std::vector<int32_t>& 
field_offsets,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (read_schema->num_fields() == 0) {
+        return Status::Invalid("read schema must not be empty");
+    }
+    if (static_cast<size_t>(read_schema->num_fields()) != 
reader_offsets.size() ||
+        reader_offsets.size() != field_offsets.size()) {
+        return Status::Invalid(
+            "read schema, row offsets and field offsets must have the same 
size");
+    }
+    if (readers.size() <= 1) {
+        return Status::Invalid("readers size is supposed to be more than 1");
+    }
+    return std::unique_ptr<DataEvolutionFileReader>(
+        new DataEvolutionFileReader(std::move(readers), read_schema, 
read_batch_size,
+                                    reader_offsets, field_offsets, 
GetArrowPool(pool)));
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
DataEvolutionFileReader::NextBatchWithBitmap() {
+    std::vector<std::shared_ptr<arrow::StructArray>> array_for_each_reader;
+    array_for_each_reader.reserve(readers_.size());
+    int64_t array_length = -1;
+    for (size_t i = 0; i < readers_.size(); i++) {
+        if (!readers_[i]) {
+            // no read field from readers_[i]
+            array_for_each_reader.push_back(nullptr);
+            continue;
+        }
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, 
NextBatchForSingleReader(i));
+        if (array == nullptr) {
+            // read eof
+            return BatchReader::MakeEofBatchWithBitmap();
+        }
+        if (array_length == -1) {
+            array_length = array->length();
+        } else if (array_length != array->length()) {
+            return Status::Invalid("array for single reader length mismatch 
others");
+        }
+        auto struct_array = 
arrow::internal::checked_pointer_cast<arrow::StructArray>(array);
+        assert(struct_array);
+        array_for_each_reader.push_back(struct_array);
+    }
+    int32_t read_field_count = read_schema_->num_fields();
+    arrow::ArrayVector target_sub_array_vec;
+    target_sub_array_vec.reserve(read_field_count);
+    for (int32_t i = 0; i < read_field_count; i++) {
+        if (reader_offsets_[i] == -1) {
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> null_array,
+                                   GetOrCreateNonExistArray(i, array_length));
+            target_sub_array_vec.push_back(null_array);
+            continue;
+        }
+        const auto& sub_array = array_for_each_reader[reader_offsets_[i]];
+        assert(sub_array->num_fields() > field_offsets_[i]);
+        target_sub_array_vec.push_back(sub_array->field(field_offsets_[i]));
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        std::shared_ptr<arrow::Array> target_array,
+        arrow::StructArray::Make(target_sub_array_vec, 
read_schema_->field_names()));
+    std::unique_ptr<ArrowArray> target_c_arrow_array = 
std::make_unique<ArrowArray>();
+    std::unique_ptr<ArrowSchema> target_c_schema = 
std::make_unique<ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(
+        arrow::ExportArray(*target_array, target_c_arrow_array.get(), 
target_c_schema.get()));
+    auto target_batch = std::make_pair(std::move(target_c_arrow_array), 
std::move(target_c_schema));
+    return ReaderUtils::AddAllValidBitmap(std::move(target_batch));
+}
+
+Result<std::shared_ptr<arrow::Array>> 
DataEvolutionFileReader::GetOrCreateNonExistArray(
+    int32_t field_idx, int64_t array_length) {
+    if (!non_exist_array_vec_[field_idx] ||
+        non_exist_array_vec_[field_idx]->length() < array_length) {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            non_exist_array_vec_[field_idx],
+            arrow::MakeArrayOfNull(read_schema_->field(field_idx)->type(), 
array_length,
+                                   arrow_pool_.get()));
+    }
+    if (non_exist_array_vec_[field_idx]->length() == array_length) {
+        return non_exist_array_vec_[field_idx];
+    }
+    return non_exist_array_vec_[field_idx]->Slice(0, array_length);
+}
+
+int64_t DataEvolutionFileReader::CalculateCachedArrayLength(size_t reader_idx) 
const {
+    int64_t total_length = 0;
+    for (const auto& array : cached_array_vec_[reader_idx]) {
+        total_length += array->length();
+    }
+    return total_length;
+}
+
+Result<std::shared_ptr<arrow::Array>> 
DataEvolutionFileReader::NextBatchForSingleReader(
+    size_t reader_idx) {
+    int64_t total_array_length = CalculateCachedArrayLength(reader_idx);
+    if (total_array_length >= read_batch_size_) {
+        assert(false);
+        return Status::Invalid(fmt::format(
+            "Unexpected: the length of cached array in last turn {} exceed 
read batch size {}",
+            total_array_length, read_batch_size_));
+    }
+    // array left for last turn
+    arrow::ArrayVector concat_array_vec = 
std::move(cached_array_vec_[reader_idx]);
+    cached_array_vec_[reader_idx].clear();
+    while (total_array_length < read_batch_size_) {
+        PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap src_array_with_bitmap,
+                               readers_[reader_idx]->NextBatchWithBitmap());
+        if (BatchReader::IsEofBatch(src_array_with_bitmap)) {
+            // read finish
+            break;
+        }
+        auto& [read_batch, bitmap] = src_array_with_bitmap;
+        auto& [c_array, c_schema] = read_batch;
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
src_array,
+                                          arrow::ImportArray(c_array.get(), 
c_schema.get()));
+        PAIMON_ASSIGN_OR_RAISE(arrow::ArrayVector selected_array_vec,
+                               
ReaderUtils::GenerateFilteredArrayVector(src_array, bitmap));
+        for (const auto& selected_array : selected_array_vec) {
+            if (total_array_length + selected_array->length() > 
read_batch_size_) {
+                // need truncate current array to align read_batch_size_
+                int64_t truncated_length = read_batch_size_ - 
total_array_length;
+                if (truncated_length == 0) {
+                    // total_array_length equals to read_batch_size_, all 
selected_array left will
+                    // be added to cached_array_vec_
+                    cached_array_vec_[reader_idx].push_back(selected_array);
+                } else {
+                    concat_array_vec.push_back(selected_array->Slice(0, 
truncated_length));
+                    cached_array_vec_[reader_idx].push_back(
+                        selected_array->Slice(truncated_length));
+                    total_array_length += truncated_length;
+                }
+            } else {
+                concat_array_vec.push_back(selected_array);
+                total_array_length += selected_array->length();
+            }
+        }
+    }
+    if (concat_array_vec.empty()) {
+        return std::shared_ptr<arrow::Array>();
+    }
+    if (concat_array_vec.size() == 1) {
+        // avoid data copy
+        return concat_array_vec[0];
+    }
+    // TODO(xinyu.lxy) remove data copy for efficiency
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
concat_array,
+                                      arrow::Concatenate(concat_array_vec, 
arrow_pool_.get()));
+    assert(concat_array->length() == total_array_length);
+    assert(concat_array->length() <= read_batch_size_);
+    return concat_array;
+}
+
+void DataEvolutionFileReader::Close() {
+    cached_array_vec_.clear();
+    non_exist_array_vec_.clear();
+    for (const auto& reader : readers_) {
+        if (reader) {
+            reader->Close();
+        }
+    }
+}
+
+std::shared_ptr<Metrics> DataEvolutionFileReader::GetReaderMetrics() const {
+    return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_file_reader.h 
b/src/paimon/common/reader/data_evolution_file_reader.h
new file mode 100644
index 0000000..e1bea66
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/metrics.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+/// This is a union reader which contains multiple inner readers.
+///
+/// This reader, assembling multiple reader into one big and great reader. The 
row it produces
+/// also come from the readers it contains.
+///
+/// For example, the expected schema for this reader is : int, int, string, 
int, string, int.(Total
+/// 6 fields) It contains three inner readers, we call them reader0, reader1 
and reader2.
+///
+/// The rowOffsets and fieldOffsets are all 6 elements long the same as
+/// output schema. RowOffsets is used to indicate which inner reader the field 
comes from, and
+/// fieldOffsets is used to indicate the offset of the field in the inner 
reader.
+///
+/// For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 
0, 1, 1, 1, 0}, it
+/// means:
+/// - The first field comes from reader0, and it is at offset 0 in reader0.
+/// - The second field comes from reader2, and it is at offset 0 in reader2.
+/// - The third field comes from reader0, and it is at offset 1 in reader0.
+/// - The fourth field comes from reader1, and it is at offset 1 in reader1.
+/// - The fifth field comes from reader2, and it is at offset 1 in reader2.
+/// - The sixth field comes from reader1, and it is at offset 0 in reader1.
+///
+/// These three readers work together, package out final and complete rows.
+class DataEvolutionFileReader : public BatchReader {
+ public:
+    static Result<std::unique_ptr<DataEvolutionFileReader>> Create(
+        std::vector<std::unique_ptr<BatchReader>>&& readers,
+        const std::shared_ptr<arrow::Schema>& read_schema, int32_t 
read_batch_size,
+        const std::vector<int32_t>& reader_offsets, const 
std::vector<int32_t>& field_offsets,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    Result<ReadBatch> NextBatch() override {
+        return Status::Invalid(
+            "paimon inner reader DataEvolutionFileReader should use 
NextBatchWithBitmap");
+    }
+
+    Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+    void Close() override;
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+ private:
+    DataEvolutionFileReader(std::vector<std::unique_ptr<BatchReader>>&& 
readers,
+                            const std::shared_ptr<arrow::Schema>& read_schema,
+                            int32_t read_batch_size, const 
std::vector<int32_t>& reader_offsets,
+                            const std::vector<int32_t>& field_offsets,
+                            const std::shared_ptr<arrow::MemoryPool>& 
arrow_pool)
+        : arrow_pool_(arrow_pool),
+          readers_(std::move(readers)),
+          read_schema_(read_schema),
+          read_batch_size_(read_batch_size),
+          reader_offsets_(reader_offsets),
+          field_offsets_(field_offsets),
+          cached_array_vec_(readers_.size()),
+          non_exist_array_vec_(read_schema->num_fields(), nullptr) {}
+
+    int64_t CalculateCachedArrayLength(size_t reader_idx) const;
+
+    Result<std::shared_ptr<arrow::Array>> NextBatchForSingleReader(size_t 
reader_idx);
+
+    Result<std::shared_ptr<arrow::Array>> GetOrCreateNonExistArray(int32_t 
field_idx,
+                                                                   int64_t 
array_length);
+
+ private:
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+    std::vector<std::unique_ptr<BatchReader>> readers_;
+    std::shared_ptr<arrow::Schema> read_schema_;
+    int32_t read_batch_size_;
+    std::vector<int32_t> reader_offsets_;
+    std::vector<int32_t> field_offsets_;
+    std::vector<arrow::ArrayVector> cached_array_vec_;
+    arrow::ArrayVector non_exist_array_vec_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_file_reader_test.cpp 
b/src/paimon/common/reader/data_evolution_file_reader_test.cpp
new file mode 100644
index 0000000..39afb15
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader_test.cpp
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_file_reader.h"
+
+#include <map>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/range.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class DataEvolutionFileReaderTest : public ::testing::Test,
+                                    public ::testing::WithParamInterface<bool> 
{
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    void CheckResult(const arrow::ArrayVector& src_array_vec,
+                     const std::shared_ptr<arrow::Schema>& read_schema,
+                     const std::vector<int32_t>& reader_offsets,
+                     const std::vector<int32_t>& field_offsets,
+                     const std::shared_ptr<arrow::Array>& expected_array,
+                     const std::optional<RoaringBitmap32>& selection_bitmap = 
std::nullopt) const {
+        for (auto batch_size : arrow::internal::Iota(1, 10)) {
+            int32_t total_row_count = 0;
+            std::vector<std::unique_ptr<BatchReader>> readers;
+            for (const auto& array : src_array_vec) {
+                if (array == nullptr) {
+                    // simulate no fields read from current reader
+                    readers.push_back(nullptr);
+                    continue;
+                }
+                total_row_count += array->length();
+                std::unique_ptr<MockFileBatchReader> file_batch_reader;
+                if (selection_bitmap) {
+                    file_batch_reader = std::make_unique<MockFileBatchReader>(
+                        array, array->type(), selection_bitmap.value(), 
batch_size);
+                } else {
+                    file_batch_reader =
+                        std::make_unique<MockFileBatchReader>(array, 
array->type(), batch_size);
+                }
+                auto enable_randomize_batch_size = GetParam();
+                
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+                readers.push_back(std::move(file_batch_reader));
+            }
+            ASSERT_OK_AND_ASSIGN(
+                auto data_evolution_file_reader,
+                DataEvolutionFileReader::Create(std::move(readers), 
read_schema, batch_size,
+                                                reader_offsets, field_offsets, 
pool_));
+            // check metrics, data_evolution_file_reader collects all row of 
each
+            // MockFileBatchReader
+            auto metrics = data_evolution_file_reader->GetReaderMetrics();
+            ASSERT_EQ(metrics->ToString(),
+                      "{\"mock.number.of.rows\":" + 
std::to_string(total_row_count) + "}");
+
+            // check result array
+            ASSERT_OK_AND_ASSIGN(
+                auto result_array,
+                
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()));
+            data_evolution_file_reader->Close();
+            auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+            ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+        }
+    }
+
+    void CheckNextBatchForSingleReader(int32_t inner_batch_size, int32_t 
read_batch_size,
+                                       const std::shared_ptr<arrow::Array>& 
src_array,
+                                       const std::optional<RoaringBitmap32>& 
selection_bitmap,
+                                       const std::shared_ptr<arrow::Array>& 
expected_array) const {
+        std::unique_ptr<MockFileBatchReader> file_batch_reader;
+        if (selection_bitmap) {
+            file_batch_reader = std::make_unique<MockFileBatchReader>(
+                src_array, src_array->type(), selection_bitmap.value(), 
inner_batch_size);
+        } else {
+            file_batch_reader = 
std::make_unique<MockFileBatchReader>(src_array, src_array->type(),
+                                                                      
inner_batch_size);
+        }
+        auto enable_randomize_batch_size = GetParam();
+        
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+        std::vector<std::unique_ptr<BatchReader>> readers;
+        readers.push_back(std::move(file_batch_reader));
+        DataEvolutionFileReader fake_data_evolution_reader(
+            std::move(readers), /*read_schema=*/arrow::schema({}), 
read_batch_size,
+            /*reader_offsets=*/{}, /*field_offsets=*/{}, GetArrowPool(pool_));
+        arrow::ArrayVector result_array_vec;
+        while (true) {
+            ASSERT_OK_AND_ASSIGN(auto result_array,
+                                 
fake_data_evolution_reader.NextBatchForSingleReader(0));
+            if (result_array == nullptr) {
+                break;
+            }
+            result_array_vec.push_back(result_array);
+        }
+        ASSERT_EQ(result_array_vec.size(),
+                  std::ceil(static_cast<double>(expected_array->length()) / 
read_batch_size));
+        // except for last batch, the length each array is expected to be 
aligned to read_batch_size
+        for (size_t i = 0; i < result_array_vec.size() - 1; i++) {
+            ASSERT_EQ(result_array_vec[i]->length(), read_batch_size);
+        }
+        if (expected_array->length() % read_batch_size == 0) {
+            ASSERT_EQ(result_array_vec.back()->length(), read_batch_size);
+        } else {
+            ASSERT_EQ(result_array_vec.back()->length(),
+                      expected_array->length() % read_batch_size);
+        }
+        auto result_chunk_array = 
std::make_shared<arrow::ChunkedArray>(result_array_vec);
+        auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+        ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(DataEvolutionFileReaderTest, TestInvalid) {
+    {
+        arrow::FieldVector read_fields;
+        auto read_schema = arrow::schema(read_fields);
+        ASSERT_NOK_WITH_MSG(
+            DataEvolutionFileReader::Create({}, read_schema, 
/*read_batch_size=*/10, {}, {}, pool_),
+            "read schema must not be empty");
+    }
+    {
+        arrow::FieldVector read_fields = {
+            arrow::field("f0", arrow::int32()),
+            arrow::field("f1", arrow::int32()),
+            arrow::field("f2", arrow::utf8()),
+            arrow::field("f3", arrow::int32()),
+        };
+        auto read_schema = arrow::schema(read_fields);
+        std::vector<int32_t> reader_offsets = {0, 0, 1};
+        std::vector<int32_t> field_offsets = {0, 1, 0};
+        ASSERT_NOK_WITH_MSG(DataEvolutionFileReader::Create({}, read_schema, 
/*read_batch_size=*/10,
+                                                            reader_offsets, 
field_offsets, pool_),
+                            "read schema, row offsets and field offsets must 
have the same size");
+    }
+    {
+        std::vector<std::unique_ptr<BatchReader>> readers;
+        readers.push_back(nullptr);
+
+        arrow::FieldVector read_fields = {
+            arrow::field("f0", arrow::int32()),
+            arrow::field("f1", arrow::int32()),
+            arrow::field("f2", arrow::utf8()),
+            arrow::field("f3", arrow::int32()),
+        };
+        auto read_schema = arrow::schema(read_fields);
+        std::vector<int32_t> reader_offsets = {0, 0, 1, 1};
+        std::vector<int32_t> field_offsets = {0, 1, 1, 0};
+        ASSERT_NOK_WITH_MSG(
+            DataEvolutionFileReader::Create(std::move(readers), read_schema, 
/*read_batch_size=*/10,
+                                            reader_offsets, field_offsets, 
pool_),
+            "readers size is supposed to be more than 1");
+    }
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestNextBatchForSingleReader) {
+    auto prepare_array = [](int64_t array_length) -> 
std::shared_ptr<arrow::Array> {
+        auto array_builder = std::make_shared<arrow::Int32Builder>();
+        for (int32_t i = 0; i < array_length; ++i) {
+            EXPECT_TRUE(array_builder->Append(i).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(array_builder->Finish(&array).ok());
+        return array;
+    };
+    auto prepare_array_with_bitmap =
+        [](const RoaringBitmap32& bitmap) -> std::shared_ptr<arrow::Array> {
+        auto array_builder = std::make_shared<arrow::Int32Builder>();
+        for (auto iter = bitmap.Begin(); iter != bitmap.End(); ++iter) {
+            EXPECT_TRUE(array_builder->Append(*iter).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(array_builder->Finish(&array).ok());
+        return array;
+    };
+    {
+        // src array length = 10, read batch size = 10
+        auto src_array = prepare_array(10);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 10)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/10, src_array,
+                                          /*selection_bitmap=*/std::nullopt,
+                                          /*expected_array=*/src_array);
+        }
+    }
+    {
+        // src array length = 10, read batch size = 6
+        auto src_array = prepare_array(10);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 6)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/6, src_array,
+                                          /*selection_bitmap=*/std::nullopt,
+                                          /*expected_array=*/src_array);
+        }
+    }
+    {
+        // src array length = 10, read batch size = 15
+        auto src_array = prepare_array(10);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          /*selection_bitmap=*/std::nullopt,
+                                          /*expected_array=*/src_array);
+        }
+    }
+    {
+        // test bulk data, src array length = 10000, read batch size = 1024
+        auto src_array = prepare_array(10000);
+        for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/1024, src_array,
+                                          /*selection_bitmap=*/std::nullopt,
+                                          /*expected_array=*/src_array);
+        }
+    }
+    {
+        // src array length = 10, selection bitmap = {1, 3, 5}
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({1, 3, 5});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          selected_bitmap, expected_array);
+        }
+    }
+    {
+        // src array length = 10, selection all
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3, 
4, 5, 6, 7, 8, 9});
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          selected_bitmap, 
/*expected_array=*/src_array);
+        }
+    }
+    {
+        // src array length = 10, selection first
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          selected_bitmap, expected_array);
+        }
+    }
+    {
+        // src array length = 10, selection first
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({9});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          selected_bitmap, expected_array);
+        }
+    }
+    {
+        // src array length = 10, selection consecutive positions
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({2, 3, 4, 5});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/15, src_array,
+                                          selected_bitmap, expected_array);
+        }
+    }
+    {
+        auto src_array = prepare_array(10);
+        RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3, 
4, 5, 7, 8, 9});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        // inner batch: [0, 1, 2, 3] | [4, 5] [7, 8] | [9]
+        CheckNextBatchForSingleReader(/*inner_batch_size=*/4, 
/*read_batch_size=*/5, src_array,
+                                      selected_bitmap, expected_array);
+    }
+    {
+        // test bulk data, src array length = 10000, read batch size = 1024
+        auto src_array = prepare_array(10000);
+        RoaringBitmap32 selected_bitmap =
+            RoaringBitmap32::From({0, 10, 1000, 2333, 4566, 7838, 8787, 9999});
+        auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+        for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
+            CheckNextBatchForSingleReader(inner_batch_size, 
/*read_batch_size=*/1024, src_array,
+                                          selected_bitmap, expected_array);
+        }
+    }
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestSimple) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+        arrow::field("f2", arrow::utf8()),  arrow::field("f3", arrow::int32()),
+        arrow::field("f4", arrow::utf8()),  arrow::field("f5", arrow::int32()),
+    };
+    auto read_schema = arrow::schema(read_fields);
+
+    std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[0], read_fields[2]}), R"([
+        [0, "00"],
+        [1, "01"],
+        [2, "02"],
+        [3, "03"],
+        [4, "04"],
+        [5, "05"]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[5], read_fields[3]}), R"([
+        [10, 110],
+        [11, 111],
+        [12, 112],
+        [13, 113],
+        [14, 114],
+        [15, 115]
+])")
+                      .ValueOrDie();
+    auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[4]}), R"([
+        [20, "20"],
+        [21, "21"],
+        [22, "22"],
+        [23, "23"],
+        [24, "24"],
+        [25, "25"]
+])")
+                      .ValueOrDie();
+
+    auto expected_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), 
R"([
+        [0, 20, "00", 110, "20", 10],
+        [1, 21, "01", 111, "21", 11],
+        [2, 22, "02", 112, "22", 12],
+        [3, 23, "03", 113, "23", 13],
+        [4, 24, "04", 114, "24", 14],
+        [5, 25, "05", 115, "25", 15]
+])")
+            .ValueOrDie();
+    CheckResult({array0, array1, array2}, read_schema, reader_offsets, 
field_offsets,
+                expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestWithNonExistField) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f0", arrow::int32()),        arrow::field("f1", 
arrow::int32()),
+        arrow::field("f2", arrow::utf8()),         arrow::field("f3", 
arrow::int32()),
+        arrow::field("f4", arrow::utf8()),         arrow::field("f5", 
arrow::int32()),
+        arrow::field("non-field", arrow::int32()),
+    };
+    auto read_schema = arrow::schema(read_fields);
+
+    std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
+
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[0], read_fields[2]}), R"([
+        [0, "00"],
+        [1, "01"],
+        [2, "02"],
+        [3, "03"],
+        [4, "04"],
+        [5, "05"]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[5], read_fields[3]}), R"([
+        [10, 110],
+        [11, 111],
+        [12, 112],
+        [13, 113],
+        [14, 114],
+        [15, 115]
+])")
+                      .ValueOrDie();
+    auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[4]}), R"([
+        [20, "20"],
+        [21, "21"],
+        [22, "22"],
+        [23, "23"],
+        [24, "24"],
+        [25, "25"]
+])")
+                      .ValueOrDie();
+
+    auto expected_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), 
R"([
+        [0, 20, "00", 110, "20", 10, null],
+        [1, 21, "01", 111, "21", 11, null],
+        [2, 22, "02", 112, "22", 12, null],
+        [3, 23, "03", 113, "23", 13, null],
+        [4, 24, "04", 114, "24", 14, null],
+        [5, 25, "05", 115, "25", 15, null]
+])")
+            .ValueOrDie();
+    CheckResult({array0, array1, array2}, read_schema, reader_offsets, 
field_offsets,
+                expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestReadFromPartialReaders) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+        arrow::field("f2", arrow::utf8()),  arrow::field("f3", arrow::int32()),
+        arrow::field("f4", arrow::utf8()),  arrow::field("f5", arrow::int32()),
+    };
+    auto read_schema = arrow::schema(read_fields);
+    // simulate reader2 has no field to read
+    std::vector<int32_t> reader_offsets = {0, 3, 0, 1, 3, 1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[0], read_fields[2]}), R"([
+        [0, "00"],
+        [1, "01"],
+        [2, "02"],
+        [3, "03"],
+        [4, "04"],
+        [5, "05"]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[5], read_fields[3]}), R"([
+        [10, 110],
+        [11, 111],
+        [12, 112],
+        [13, 113],
+        [14, 114],
+        [15, 115]
+])")
+                      .ValueOrDie();
+    auto array3 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[4]}), R"([
+        [20, "20"],
+        [21, "21"],
+        [22, "22"],
+        [23, "23"],
+        [24, "24"],
+        [25, "25"]
+])")
+                      .ValueOrDie();
+
+    auto expected_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), 
R"([
+        [0, 20, "00", 110, "20", 10],
+        [1, 21, "01", 111, "21", 11],
+        [2, 22, "02", 112, "22", 12],
+        [3, 23, "03", 113, "23", 13],
+        [4, 24, "04", 114, "24", 14],
+        [5, 25, "05", 115, "25", 15]
+])")
+            .ValueOrDie();
+
+    CheckResult({array0, array1, nullptr, array3}, read_schema, 
reader_offsets, field_offsets,
+                expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestNestedType) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())),
+        arrow::field("f2", arrow::list(arrow::float32())),
+        arrow::field("f3", arrow::struct_({arrow::field("f0", 
arrow::boolean()),
+                                           arrow::field("f1", 
arrow::int64())})),
+        arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f5", arrow::date32()),
+        arrow::field("f6", arrow::decimal128(2, 2))};
+    auto read_schema = arrow::schema(read_fields);
+
+    std::vector<int32_t> reader_offsets = {0, 1, 0, 1, 0, 1};
+    std::vector<int32_t> field_offsets = {2, 0, 1, 1, 0, 2};
+
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[4], read_fields[2], 
read_fields[0]}), R"([
+        [2456, [true, 2], [[0, 0]]],
+        [24, [true, 1], [[0, 1]]],
+        [2456, [false, 12], [[10, 10]]],
+        [245, [false, 2222], [[127, 32767], [-128, -32768]]],
+        [24, [true, 2], [[1, 64], [2, 32]]],
+        [24, [true, 2], [[11, 64], [12, 32]]]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[3], 
read_fields[5]}), R"([
+        [[0.1, 0.2], "1970-01-01 00:02:03.123123", "0.22"],
+        [[0.1, 0.3], "1970-01-01 00:02:03.999999", "0.28"],
+        [[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.22"],
+        [[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.12"],
+        [[2.2, 3.2], "1970-01-01 00:00:00.0", "0.78"],
+        [[2.2, 3.2], "1970-01-01 00:00:00.123123", "0.78"]
+])")
+                      .ValueOrDie();
+
+    auto expected_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), 
R"([
+        [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456, 
"0.22"],
+        [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999999", 24, 
"0.28"],
+        [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123123", 
2456, "0.22"],
+        [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222], 
"1970-01-01 00:02:03.123123", 245, "0.12"],
+        [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0", 
24, "0.78"],
+        [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01 
00:00:00.123123", 24, "0.78"]
+])")
+            .ValueOrDie();
+    CheckResult({array0, array1}, read_schema, reader_offsets, field_offsets, 
expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestWithBitmap) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f0", arrow::int32()),       arrow::field("f1", 
arrow::int32()),
+        arrow::field("f2", arrow::utf8()),        arrow::field("f3", 
arrow::int32()),
+        arrow::field("f4", arrow::utf8()),        arrow::field("f5", 
arrow::int32()),
+        arrow::field("non-exist", arrow::int32())};
+    auto read_schema = arrow::schema(read_fields);
+
+    std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
+
+    RoaringBitmap32 selection_bitmap = RoaringBitmap32::From({1, 3, 5});
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[0], read_fields[2]}), R"([
+        [0, "00"],
+        [1, "01"],
+        [2, "02"],
+        [3, "03"],
+        [4, "04"],
+        [5, "05"]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[5], read_fields[3]}), R"([
+        [10, 110],
+        [11, 111],
+        [12, 112],
+        [13, 113],
+        [14, 114],
+        [15, 115]
+])")
+                      .ValueOrDie();
+    auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[4]}), R"([
+        [20, "20"],
+        [21, "21"],
+        [22, "22"],
+        [23, "23"],
+        [24, "24"],
+        [25, "25"]
+])")
+                      .ValueOrDie();
+
+    auto expected_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields), 
R"([
+        [1, 21, "01", 111, "21", 11, null],
+        [3, 23, "03", 113, "23", 13, null],
+        [5, 25, "05", 115, "25", 15, null]
+])")
+            .ValueOrDie();
+    CheckResult({array0, array1, array2}, read_schema, reader_offsets, 
field_offsets,
+                expected_array, selection_bitmap);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestSingleReaderRowCountMismatch) {
+    arrow::FieldVector read_fields = {
+        arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+        arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32())};
+    auto read_schema = arrow::schema(read_fields);
+
+    std::vector<int32_t> reader_offsets = {0, 1, 0, 1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1};
+
+    auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[0], read_fields[2]}), R"([
+        [0, "00"],
+        [1, "01"],
+        [2, "02"],
+        [3, "03"],
+        [4, "04"],
+        [5, "05"]
+])")
+                      .ValueOrDie();
+    auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::struct_({read_fields[1], read_fields[3]}), R"([
+        [10, 110],
+        [11, 111],
+        [12, 112],
+        [13, 113],
+        [14, 114]
+])")
+                      .ValueOrDie();
+    std::vector<std::unique_ptr<BatchReader>> readers;
+    for (const auto& array : {array0, array1}) {
+        auto file_batch_reader =
+            std::make_unique<MockFileBatchReader>(array, array->type(), 
/*read_batch_size=*/10);
+        auto enable_randomize_batch_size = GetParam();
+        
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+        readers.push_back(std::move(file_batch_reader));
+    }
+    ASSERT_OK_AND_ASSIGN(
+        auto data_evolution_file_reader,
+        DataEvolutionFileReader::Create(std::move(readers), read_schema, 
/*read_batch_size=*/10,
+                                        reader_offsets, field_offsets, pool_));
+    // array0 has 6 rows but array1 only has 5 rows
+    ASSERT_NOK_WITH_MSG(
+        
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()),
+        "array for single reader length mismatch others");
+}
+
+INSTANTIATE_TEST_SUITE_P(EnableRandomizeBatchSize, DataEvolutionFileReaderTest,
+                         ::testing::ValuesIn({true, false}));
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/reader/data_evolution_row.h 
b/src/paimon/common/reader/data_evolution_row.h
new file mode 100644
index 0000000..93393c0
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_row.h
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+class RowKind;
+
+/// The row which is made up by several rows.
+class DataEvolutionRow : public InternalRow {
+ public:
+    DataEvolutionRow(const std::vector<BinaryRow>& rows, const 
std::vector<int32_t>& row_offsets,
+                     const std::vector<int32_t>& field_offsets)
+        : rows_(rows), row_offsets_(row_offsets), 
field_offsets_(field_offsets) {
+        assert(!rows_.empty());
+    }
+
+    Result<const RowKind*> GetRowKind() const override {
+        if (!row_kind_) {
+            PAIMON_ASSIGN_OR_RAISE(row_kind_, rows_[0].GetRowKind());
+        }
+        return row_kind_;
+    }
+
+    void SetRowKind(const RowKind* kind) override {
+        row_kind_ = kind;
+    }
+
+    int32_t GetFieldCount() const override {
+        return field_offsets_.size();
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        if (row_offsets_[pos] < 0) {
+            return true;
+        }
+        return rows_[row_offsets_[pos]].IsNullAt(field_offsets_[pos]);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetBoolean(field_offsets_[pos]);
+    }
+
+    char GetByte(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetByte(field_offsets_[pos]);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetShort(field_offsets_[pos]);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetInt(field_offsets_[pos]);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetDate(field_offsets_[pos]);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetLong(field_offsets_[pos]);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetFloat(field_offsets_[pos]);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetDouble(field_offsets_[pos]);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetString(field_offsets_[pos]);
+    }
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetBinary(field_offsets_[pos]);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetStringView(field_offsets_[pos]);
+    }
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+        return rows_[row_offsets_[pos]].GetTimestamp(field_offsets_[pos], 
precision);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override {
+        return rows_[row_offsets_[pos]].GetDecimal(field_offsets_[pos], 
precision, scale);
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override {
+        return rows_[row_offsets_[pos]].GetRow(field_offsets_[pos], 
num_fields);
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetArray(field_offsets_[pos]);
+    }
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+        return rows_[row_offsets_[pos]].GetMap(field_offsets_[pos]);
+    }
+
+    std::string ToString() const override {
+        return "DataEvolutionRow";
+    }
+
+ private:
+    mutable const RowKind* row_kind_ = nullptr;
+    std::vector<BinaryRow> rows_;
+    std::vector<int32_t> row_offsets_;
+    std::vector<int32_t> field_offsets_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_row_test.cpp 
b/src/paimon/common/reader/data_evolution_row_test.cpp
new file mode 100644
index 0000000..47b7f54
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_row_test.cpp
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_row.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <variant>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DataEvolutionRowTest, TestSimple) {
+    // f0:int32
+    // f1:int32
+    // f2:string
+    // f3:int32
+    // f4:string
+    // f5:int32
+    std::vector<int32_t> row_offsets = {0, 2, 0, 1, 2, 1};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+    auto pool = GetDefaultPool();
+    BinaryRow row1 = BinaryRowGenerator::GenerateRow({0, std::string("00")}, 
pool.get());
+    BinaryRow row2 = BinaryRowGenerator::GenerateRow({10, 110}, pool.get());
+    BinaryRow row3 = BinaryRowGenerator::GenerateRow({20, std::string("20")}, 
pool.get());
+
+    DataEvolutionRow row({row1, row2, row3}, row_offsets, field_offsets);
+    ASSERT_EQ(row.GetFieldCount(), 6);
+    ASSERT_FALSE(row.IsNullAt(0));
+    ASSERT_EQ(row.GetInt(0), 0);
+    ASSERT_EQ(row.GetInt(1), 20);
+    ASSERT_EQ(row.GetString(2).ToString(), "00");
+    ASSERT_EQ(row.GetInt(3), 110);
+    ASSERT_EQ(row.GetString(4).ToString(), "20");
+    ASSERT_EQ(row.GetInt(5), 10);
+
+    // test set and get row kind
+    ASSERT_OK_AND_ASSIGN(const RowKind* row_kind, row.GetRowKind());
+    ASSERT_EQ(*row_kind, *RowKind::Insert());
+    row.SetRowKind(RowKind::UpdateAfter());
+    ASSERT_OK_AND_ASSIGN(const RowKind* new_row_kind, row.GetRowKind());
+    ASSERT_EQ(*new_row_kind, *RowKind::UpdateAfter());
+    ASSERT_EQ(row.ToString(), "DataEvolutionRow");
+}
+
+TEST(DataEvolutionRowTest, TestNull) {
+    // f0:int32
+    // f1:int32
+    // f2:string
+    // f3:int32
+    // f4:string
+    // f5:int32
+    // f6:non-exist
+    std::vector<int32_t> row_offsets = {0, 2, 0, 1, 2, 1, -1, -2};
+    std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1, -1};
+
+    auto pool = GetDefaultPool();
+    BinaryRow row1 = BinaryRowGenerator::GenerateRow({0, std::string("00")}, 
pool.get());
+    BinaryRow row2 = BinaryRowGenerator::GenerateRow({10, 110}, pool.get());
+    BinaryRow row3 = BinaryRowGenerator::GenerateRow({20, NullType()}, 
pool.get());
+
+    DataEvolutionRow row({row1, row2, row3}, row_offsets, field_offsets);
+    ASSERT_EQ(row.GetFieldCount(), 8);
+    ASSERT_FALSE(row.IsNullAt(0));
+    ASSERT_EQ(row.GetInt(0), 0);
+    ASSERT_EQ(row.GetInt(1), 20);
+    ASSERT_EQ(row.GetString(2).ToString(), "00");
+    ASSERT_EQ(row.GetInt(3), 110);
+    ASSERT_TRUE(row.IsNullAt(4));
+    ASSERT_EQ(row.GetInt(5), 10);
+    ASSERT_TRUE(row.IsNullAt(6));
+    ASSERT_TRUE(row.IsNullAt(7));
+}
+
+TEST(DataEvolutionRowTest, TestAllFieldTypes) {
+    auto pool = GetDefaultPool();
+
+    // Row0: boolean(true), byte(42), short(1024), int(100000), date(19000)
+    BinaryRow row0 =
+        BinaryRowGenerator::GenerateRow({true, static_cast<int8_t>(42), 
static_cast<int16_t>(1024),
+                                         static_cast<int32_t>(100000), 
static_cast<int32_t>(19000)},
+                                        pool.get());
+
+    // Row1: long(-987654321), float(3.14f), double(-9.87654321)
+    BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+        {static_cast<int64_t>(-987654321LL), static_cast<float>(3.14f),
+         static_cast<double>(-9.87654321)},
+        pool.get());
+
+    // Row2: string("hello"), string("world")
+    BinaryRow row2 =
+        BinaryRowGenerator::GenerateRow({std::string("hello"), 
std::string("world")}, pool.get());
+
+    // Row3: decimal(10,2,67890), timestamp(1000,500) with precision 9
+    BinaryRow row3 = BinaryRowGenerator::GenerateRow(
+        {Decimal(10, 2, 67890), TimestampType(Timestamp(1000, 500), 9)}, 
pool.get());
+
+    // Row4: binary("fghij")
+    auto bytes_val = std::make_shared<Bytes>("fghij", pool.get());
+    BinaryRow row4 = BinaryRowGenerator::GenerateRow({bytes_val}, pool.get());
+
+    // Row5: nested array [10,20], nested map {1:100, 2:200}, nested row 
("Alice", 30)
+    // Built manually via BinaryRowWriter since BinaryRowGenerator doesn't 
support nested types
+    BinaryRow row5(3);
+    {
+        BinaryRowWriter writer(&row5, 0, pool.get());
+
+        // field 0: array [10, 20]
+        auto inner_array = BinaryArray::FromIntArray({10, 20}, pool.get());
+        writer.WriteArray(0, inner_array);
+
+        // field 1: map {1->100, 2->200}
+        auto map_key = BinaryArray::FromIntArray({1, 2}, pool.get());
+        auto map_val = BinaryArray::FromLongArray({100LL, 200LL}, pool.get());
+        auto map_obj = BinaryMap::ValueOf(map_key, map_val, pool.get());
+        writer.WriteMap(1, *map_obj);
+
+        // field 2: row ("Alice", 30)
+        BinaryRow inner_row = BinaryRowGenerator::GenerateRow(
+            {std::string("Alice"), static_cast<int32_t>(30)}, pool.get());
+        writer.WriteRow(2, inner_row);
+
+        writer.Complete();
+    }
+
+    // Build the DataEvolutionRow that picks specific fields from each row.
+    // evolution field -> (row_index, field_index_within_row)
+    // pos 0:  boolean   -> row0[0]  = true
+    // pos 1:  byte      -> row0[1]  = 42
+    // pos 2:  short     -> row0[2]  = 1024
+    // pos 3:  int       -> row0[3]  = 100000
+    // pos 4:  date      -> row0[4]  = 19000
+    // pos 5:  long      -> row1[0]  = -987654321
+    // pos 6:  float     -> row1[1]  = 3.14f
+    // pos 7:  double    -> row1[2]  = -9.87654321
+    // pos 8:  string    -> row2[0]  = "hello"
+    // pos 9:  stringview-> row2[1]  = "world"
+    // pos 10: decimal   -> row3[0]  = Decimal(10,2,67890)
+    // pos 11: timestamp -> row3[1]  = Timestamp(1000,500)
+    // pos 12: binary    -> row4[0]  = "fghij"
+    // pos 13: array     -> row5[0]  = [10,20]
+    // pos 14: map       -> row5[1]  = {1:100, 2:200}
+    // pos 15: row       -> row5[2]  = ("Alice", 30)
+    std::vector<int32_t> row_offsets = {0, 0, 0, 0, 0, 1, 1, 1, 2, 2, 3, 3, 4, 
5, 5, 5};
+    std::vector<int32_t> field_offsets = {0, 1, 2, 3, 4, 0, 1, 2, 0, 1, 0, 1, 
0, 0, 1, 2};
+
+    DataEvolutionRow evolution({row0, row1, row2, row3, row4, row5}, 
row_offsets, field_offsets);
+
+    ASSERT_EQ(evolution.GetFieldCount(), 16);
+
+    // Boolean
+    ASSERT_FALSE(evolution.IsNullAt(0));
+    ASSERT_EQ(evolution.GetBoolean(0), true);
+
+    // Byte
+    ASSERT_EQ(evolution.GetByte(1), 42);
+
+    // Short
+    ASSERT_EQ(evolution.GetShort(2), 1024);
+
+    // Int
+    ASSERT_EQ(evolution.GetInt(3), 100000);
+
+    // Date
+    ASSERT_EQ(evolution.GetDate(4), 19000);
+
+    // Long
+    ASSERT_EQ(evolution.GetLong(5), -987654321LL);
+
+    // Float
+    ASSERT_FLOAT_EQ(evolution.GetFloat(6), 3.14f);
+
+    // Double
+    ASSERT_DOUBLE_EQ(evolution.GetDouble(7), -9.87654321);
+
+    // String
+    ASSERT_EQ(evolution.GetString(8).ToString(), "hello");
+
+    // StringView
+    ASSERT_EQ(evolution.GetStringView(9), "world");
+
+    // Decimal
+    ASSERT_EQ(evolution.GetDecimal(10, 10, 2), Decimal(10, 2, 67890));
+
+    // Timestamp
+    ASSERT_EQ(evolution.GetTimestamp(11, 9), Timestamp(1000, 500));
+
+    // Binary
+    auto retrieved_binary = evolution.GetBinary(12);
+    Bytes expected_binary("fghij", pool.get());
+    ASSERT_EQ(*retrieved_binary, expected_binary);
+
+    // Nested Array
+    auto retrieved_array = evolution.GetArray(13);
+    ASSERT_OK_AND_ASSIGN(auto inner_values, retrieved_array->ToIntArray());
+    ASSERT_EQ(inner_values, std::vector<int32_t>({10, 20}));
+
+    // Map
+    auto retrieved_map = evolution.GetMap(14);
+    ASSERT_EQ(retrieved_map->Size(), 2);
+
+    // Nested Row
+    auto retrieved_row = evolution.GetRow(15, 2);
+    ASSERT_EQ(retrieved_row->GetString(0).ToString(), "Alice");
+    ASSERT_EQ(retrieved_row->GetInt(1), 30);
+
+    // ToString
+    ASSERT_EQ(evolution.ToString(), "DataEvolutionRow");
+}
+
+}  // namespace paimon::test

Reply via email to