This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 69750b4 feat: add data evolution reader infrastructure (#64)
69750b4 is described below
commit 69750b41043a1194b3e78d1b00d3ed496c3a35aa
Author: lszskye <[email protected]>
AuthorDate: Tue Jun 9 01:25:01 2026 -0700
feat: add data evolution reader infrastructure (#64)
---
src/paimon/common/reader/data_evolution_array.h | 167 ++++++
.../common/reader/data_evolution_array_test.cpp | 348 +++++++++++
.../common/reader/data_evolution_file_reader.cpp | 197 +++++++
.../common/reader/data_evolution_file_reader.h | 105 ++++
.../reader/data_evolution_file_reader_test.cpp | 635 +++++++++++++++++++++
src/paimon/common/reader/data_evolution_row.h | 151 +++++
.../common/reader/data_evolution_row_test.cpp | 242 ++++++++
7 files changed, 1845 insertions(+)
diff --git a/src/paimon/common/reader/data_evolution_array.h
b/src/paimon/common/reader/data_evolution_array.h
new file mode 100644
index 0000000..d311e63
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_array.h
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class InternalMap;
+class InternalRow;
+
+/// The array which is made up by several arrays.
+class DataEvolutionArray : public InternalArray {
+ public:
+ DataEvolutionArray(const std::vector<BinaryArray>& arrays,
+ const std::vector<int32_t>& array_offsets,
+ const std::vector<int32_t>& field_offsets)
+ : arrays_(arrays), array_offsets_(array_offsets),
field_offsets_(field_offsets) {
+ assert(!arrays_.empty());
+ }
+
+ int32_t Size() const override {
+ return array_offsets_.size();
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ if (array_offsets_[pos] < 0) {
+ return true;
+ }
+ return arrays_[array_offsets_[pos]].IsNullAt(field_offsets_[pos]);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetBoolean(field_offsets_[pos]);
+ }
+
+ char GetByte(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetByte(field_offsets_[pos]);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetShort(field_offsets_[pos]);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetInt(field_offsets_[pos]);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetDate(field_offsets_[pos]);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetLong(field_offsets_[pos]);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetFloat(field_offsets_[pos]);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetDouble(field_offsets_[pos]);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetString(field_offsets_[pos]);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetStringView(field_offsets_[pos]);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override {
+ return arrays_[array_offsets_[pos]].GetDecimal(field_offsets_[pos],
precision, scale);
+ }
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+ return arrays_[array_offsets_[pos]].GetTimestamp(field_offsets_[pos],
precision);
+ }
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetBinary(field_offsets_[pos]);
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetArray(field_offsets_[pos]);
+ }
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+ return arrays_[array_offsets_[pos]].GetMap(field_offsets_[pos]);
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override {
+ return arrays_[array_offsets_[pos]].GetRow(field_offsets_[pos],
num_fields);
+ }
+
+ Result<std::vector<char>> ToBooleanArray() const override {
+ return Status::Invalid("DataEvolutionArray not support
ToBooleanArray");
+ }
+
+ Result<std::vector<char>> ToByteArray() const override {
+ return Status::Invalid("DataEvolutionArray not support ToByteArray");
+ }
+
+ Result<std::vector<int16_t>> ToShortArray() const override {
+ return Status::Invalid("DataEvolutionArray not support ToShortArray");
+ }
+
+ Result<std::vector<int32_t>> ToIntArray() const override {
+ return Status::Invalid("DataEvolutionArray not support ToIntArray");
+ }
+
+ Result<std::vector<int64_t>> ToLongArray() const override {
+ std::vector<int64_t> result;
+ result.reserve(Size());
+ for (int32_t i = 0; i < Size(); i++) {
+ assert(!IsNullAt(i));
+ result.push_back(GetLong(i));
+ }
+ return result;
+ }
+
+ Result<std::vector<float>> ToFloatArray() const override {
+ return Status::Invalid("DataEvolutionArray not support ToFloatArray");
+ }
+
+ Result<std::vector<double>> ToDoubleArray() const override {
+ return Status::Invalid("DataEvolutionArray not support ToDoubleArray");
+ }
+
+ private:
+ std::vector<BinaryArray> arrays_;
+ std::vector<int32_t> array_offsets_;
+ std::vector<int32_t> field_offsets_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_array_test.cpp
b/src/paimon/common/reader/data_evolution_array_test.cpp
new file mode 100644
index 0000000..5ef2e17
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_array_test.cpp
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_array.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DataEvolutionArrayTest, TestSimple) {
+ auto pool = GetDefaultPool();
+
+ std::vector<int32_t> array_offsets = {0, 2, 0, 1, 2, 1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+ std::vector<int64_t> src_array1 = {1, -100};
+ auto array1 = BinaryRowGenerator::FromLongArrayWithNull(src_array1,
pool.get());
+
+ std::vector<int64_t> src_array2 = {2, -2};
+ auto array2 = BinaryRowGenerator::FromLongArrayWithNull(src_array2,
pool.get());
+
+ std::vector<int64_t> src_array3 = {3, -3};
+ auto array3 = BinaryRowGenerator::FromLongArrayWithNull(src_array3,
pool.get());
+
+ DataEvolutionArray data_evolution_array(std::vector<BinaryArray>({array1,
array2, array3}),
+ array_offsets, field_offsets);
+
+ ASSERT_FALSE(data_evolution_array.IsNullAt(0));
+
+ ASSERT_EQ(data_evolution_array.GetLong(0), 1);
+ ASSERT_EQ(data_evolution_array.GetLong(1), 3);
+ ASSERT_EQ(data_evolution_array.GetLong(2), -100);
+ ASSERT_EQ(data_evolution_array.GetLong(3), -2);
+ ASSERT_EQ(data_evolution_array.GetLong(4), -3);
+ ASSERT_EQ(data_evolution_array.GetLong(5), 2);
+
+ ASSERT_OK_AND_ASSIGN(auto ret, data_evolution_array.ToLongArray());
+ ASSERT_EQ(ret, std::vector<int64_t>({1, 3, -100, -2, -3, 2}));
+
+ ASSERT_NOK_WITH_MSG(data_evolution_array.ToBooleanArray(),
+ "DataEvolutionArray not support ToBooleanArray");
+}
+
+TEST(DataEvolutionArrayTest, TestNullValue) {
+ auto pool = GetDefaultPool();
+
+ std::vector<int32_t> array_offsets = {-2, -1, 0, 0};
+ std::vector<int32_t> field_offsets = {-1, -1, 0, 1};
+
+ std::vector<int64_t> src_array1 = {1, -100};
+ auto array1 = BinaryRowGenerator::FromLongArrayWithNull(src_array1,
pool.get());
+
+ DataEvolutionArray
data_evolution_array(std::vector<BinaryArray>({array1}), array_offsets,
+ field_offsets);
+
+ ASSERT_TRUE(data_evolution_array.IsNullAt(0));
+ ASSERT_TRUE(data_evolution_array.IsNullAt(1));
+ ASSERT_EQ(data_evolution_array.GetLong(2), 1);
+ ASSERT_EQ(data_evolution_array.GetLong(3), -100);
+}
+
+TEST(DataEvolutionArrayTest, TestAllFieldTypes) {
+ auto pool = GetDefaultPool();
+
+ // We create separate BinaryArrays for each field type and use
DataEvolutionArray
+ // to proxy access across them. Each array has 2 elements; the evolution
array
+ // picks one element from each underlying array via
array_offsets/field_offsets.
+
+ // --- Boolean array (array index 0) ---
+ BinaryArray bool_array;
+ {
+ BinaryArrayWriter writer(&bool_array, 2, sizeof(bool), pool.get());
+ writer.WriteBoolean(0, true);
+ writer.WriteBoolean(1, false);
+ writer.Complete();
+ }
+
+ // --- Byte array (array index 1) ---
+ BinaryArray byte_array;
+ {
+ BinaryArrayWriter writer(&byte_array, 2, sizeof(int8_t), pool.get());
+ writer.WriteByte(0, 42);
+ writer.WriteByte(1, -7);
+ writer.Complete();
+ }
+
+ // --- Short array (array index 2) ---
+ BinaryArray short_array;
+ {
+ BinaryArrayWriter writer(&short_array, 2, sizeof(int16_t), pool.get());
+ writer.WriteShort(0, 1024);
+ writer.WriteShort(1, -512);
+ writer.Complete();
+ }
+
+ // --- Int array (array index 3) ---
+ BinaryArray int_array;
+ {
+ BinaryArrayWriter writer(&int_array, 2, sizeof(int32_t), pool.get());
+ writer.WriteInt(0, 100000);
+ writer.WriteInt(1, -99999);
+ writer.Complete();
+ }
+
+ // --- Long array (array index 4) ---
+ BinaryArray long_array;
+ {
+ BinaryArrayWriter writer(&long_array, 2, sizeof(int64_t), pool.get());
+ writer.WriteLong(0, 1234567890LL);
+ writer.WriteLong(1, -987654321LL);
+ writer.Complete();
+ }
+
+ // --- Float array (array index 5) ---
+ BinaryArray float_array;
+ {
+ BinaryArrayWriter writer(&float_array, 2, sizeof(float), pool.get());
+ writer.WriteFloat(0, 3.14f);
+ writer.WriteFloat(1, -2.71f);
+ writer.Complete();
+ }
+
+ // --- Double array (array index 6) ---
+ BinaryArray double_array;
+ {
+ BinaryArrayWriter writer(&double_array, 2, sizeof(double), pool.get());
+ writer.WriteDouble(0, 1.23456789);
+ writer.WriteDouble(1, -9.87654321);
+ writer.Complete();
+ }
+
+ // --- String array (array index 7) ---
+ BinaryArray string_array;
+ {
+ BinaryArrayWriter writer(&string_array, 2, 8, pool.get());
+ writer.WriteString(0, BinaryString::FromString("hello", pool.get()));
+ writer.WriteString(1, BinaryString::FromString("world", pool.get()));
+ writer.Complete();
+ }
+
+ // --- Decimal array (array index 8), precision=10, scale=2 ---
+ BinaryArray decimal_array;
+ {
+ BinaryArrayWriter writer(&decimal_array, 2, 8, pool.get());
+ writer.WriteDecimal(0, Decimal(10, 2, 12345), 10);
+ writer.WriteDecimal(1, Decimal(10, 2, 67890), 10);
+ writer.Complete();
+ }
+
+ // --- Timestamp array (array index 9), precision=9 (non-compact) ---
+ BinaryArray timestamp_array;
+ {
+ BinaryArrayWriter writer(×tamp_array, 2, 8, pool.get());
+ writer.WriteTimestamp(0, Timestamp(1000, 500), 9);
+ writer.WriteTimestamp(1, Timestamp(2000, 999), 9);
+ writer.Complete();
+ }
+
+ // --- Binary/Bytes array (array index 10) ---
+ BinaryArray binary_array;
+ {
+ Bytes bytes_val1("abcde", pool.get());
+ Bytes bytes_val2("fghij", pool.get());
+ BinaryArrayWriter writer(&binary_array, 2, 8, pool.get());
+ writer.WriteBinary(0, bytes_val1);
+ writer.WriteBinary(1, bytes_val2);
+ writer.Complete();
+ }
+
+ // --- Nested Array array (array index 11) ---
+ BinaryArray nested_array;
+ {
+ auto inner = BinaryArray::FromIntArray({10, 20}, pool.get());
+ auto inner2 = BinaryArray::FromIntArray({30, 40}, pool.get());
+ BinaryArrayWriter writer(&nested_array, 2, 8, pool.get());
+ writer.WriteArray(0, inner);
+ writer.WriteArray(1, inner2);
+ writer.Complete();
+ }
+
+ // --- Map array (array index 12) ---
+ BinaryArray map_array;
+ {
+ auto map_key = BinaryArray::FromIntArray({1, 2}, pool.get());
+ auto map_val = BinaryArray::FromLongArray({100LL, 200LL}, pool.get());
+ auto map_obj = BinaryMap::ValueOf(map_key, map_val, pool.get());
+ BinaryArrayWriter writer(&map_array, 1, 8, pool.get());
+ writer.WriteMap(0, *map_obj);
+ writer.Complete();
+ }
+
+ // --- Row array (array index 13) ---
+ BinaryArray row_array;
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {std::string("Alice"), static_cast<int32_t>(30)}, pool.get());
+ BinaryArrayWriter writer(&row_array, 1, 8, pool.get());
+ writer.WriteRow(0, row1);
+ writer.Complete();
+ }
+
+ // --- Date array (array index 14), stored as int32 ---
+ BinaryArray date_array;
+ {
+ BinaryArrayWriter writer(&date_array, 2, sizeof(int32_t), pool.get());
+ writer.WriteInt(0, 19000); // days since epoch
+ writer.WriteInt(1, 18500);
+ writer.Complete();
+ }
+
+ std::vector<BinaryArray> arrays = {
+ bool_array, // 0
+ byte_array, // 1
+ short_array, // 2
+ int_array, // 3
+ long_array, // 4
+ float_array, // 5
+ double_array, // 6
+ string_array, // 7
+ decimal_array, // 8
+ timestamp_array, // 9
+ binary_array, // 10
+ nested_array, // 11
+ map_array, // 12
+ row_array, // 13
+ date_array, // 14
+ };
+
+ // Each evolution entry: (array_index, field_index_within_array)
+ // pos 0: boolean -> arrays[0][0] = true
+ // pos 1: byte -> arrays[1][0] = 42
+ // pos 2: short -> arrays[2][1] = -512
+ // pos 3: int -> arrays[3][0] = 100000
+ // pos 4: long -> arrays[4][1] = -987654321
+ // pos 5: float -> arrays[5][0] = 3.14f
+ // pos 6: double -> arrays[6][1] = -9.87654321
+ // pos 7: string -> arrays[7][0] = "hello"
+ // pos 8: decimal -> arrays[8][1] = Decimal(10,2,67890)
+ // pos 9: timestamp -> arrays[9][0] = Timestamp(1000,500)
+ // pos 10: binary -> arrays[10][1] = "fghij"
+ // pos 11: array -> arrays[11][0] = [10,20]
+ // pos 12: map -> arrays[12][0]
+ // pos 13: row -> arrays[13][0] = ("Alice", 30)
+ // pos 14: date -> arrays[14][0] = 19000
+ // pos 15: string -> arrays[7][1] = "world"
+ std::vector<int32_t> array_offsets = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 7};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0,
0, 0, 0, 1};
+
+ DataEvolutionArray evolution(arrays, array_offsets, field_offsets);
+
+ ASSERT_EQ(evolution.Size(), 16);
+
+ // Boolean
+ ASSERT_FALSE(evolution.IsNullAt(0));
+ ASSERT_EQ(evolution.GetBoolean(0), true);
+
+ // Byte
+ ASSERT_EQ(evolution.GetByte(1), 42);
+
+ // Short
+ ASSERT_EQ(evolution.GetShort(2), -512);
+
+ // Int
+ ASSERT_EQ(evolution.GetInt(3), 100000);
+
+ // Long
+ ASSERT_EQ(evolution.GetLong(4), -987654321LL);
+
+ // Float
+ ASSERT_FLOAT_EQ(evolution.GetFloat(5), 3.14f);
+
+ // Double
+ ASSERT_DOUBLE_EQ(evolution.GetDouble(6), -9.87654321);
+
+ // String
+ ASSERT_EQ(evolution.GetString(7).ToString(), "hello");
+
+ // StringView
+ ASSERT_EQ(evolution.GetStringView(15), "world");
+
+ // Decimal
+ ASSERT_EQ(evolution.GetDecimal(8, 10, 2), Decimal(10, 2, 67890));
+
+ // Timestamp
+ ASSERT_EQ(evolution.GetTimestamp(9, 9), Timestamp(1000, 500));
+
+ // Binary
+ auto retrieved_binary = evolution.GetBinary(10);
+ Bytes expected_binary("fghij", pool.get());
+ ASSERT_EQ(*retrieved_binary, expected_binary);
+
+ // Nested Array
+ auto retrieved_array = evolution.GetArray(11);
+ ASSERT_OK_AND_ASSIGN(auto inner_values, retrieved_array->ToIntArray());
+ ASSERT_EQ(inner_values, std::vector<int32_t>({10, 20}));
+
+ // Map
+ auto retrieved_map = evolution.GetMap(12);
+ ASSERT_EQ(retrieved_map->Size(), 2);
+
+ // Row
+ auto retrieved_row = evolution.GetRow(13, 2);
+ ASSERT_EQ(retrieved_row->GetString(0).ToString(), "Alice");
+ ASSERT_EQ(retrieved_row->GetInt(1), 30);
+
+ // Date
+ ASSERT_EQ(evolution.GetDate(14), 19000);
+
+ // Verify unsupported ToXxxArray methods
+ ASSERT_NOK_WITH_MSG(evolution.ToBooleanArray(),
+ "DataEvolutionArray not support ToBooleanArray");
+ ASSERT_NOK_WITH_MSG(evolution.ToByteArray(), "DataEvolutionArray not
support ToByteArray");
+ ASSERT_NOK_WITH_MSG(evolution.ToShortArray(), "DataEvolutionArray not
support ToShortArray");
+ ASSERT_NOK_WITH_MSG(evolution.ToIntArray(), "DataEvolutionArray not
support ToIntArray");
+ ASSERT_NOK_WITH_MSG(evolution.ToFloatArray(), "DataEvolutionArray not
support ToFloatArray");
+ ASSERT_NOK_WITH_MSG(evolution.ToDoubleArray(), "DataEvolutionArray not
support ToDoubleArray");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/reader/data_evolution_file_reader.cpp
b/src/paimon/common/reader/data_evolution_file_reader.cpp
new file mode 100644
index 0000000..8e8fbbc
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader.cpp
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_file_reader.h"
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "fmt/format.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+namespace paimon {
+Result<std::unique_ptr<DataEvolutionFileReader>>
DataEvolutionFileReader::Create(
+ std::vector<std::unique_ptr<BatchReader>>&& readers,
+ const std::shared_ptr<arrow::Schema>& read_schema, int32_t read_batch_size,
+ const std::vector<int32_t>& reader_offsets, const std::vector<int32_t>&
field_offsets,
+ const std::shared_ptr<MemoryPool>& pool) {
+ if (read_schema->num_fields() == 0) {
+ return Status::Invalid("read schema must not be empty");
+ }
+ if (static_cast<size_t>(read_schema->num_fields()) !=
reader_offsets.size() ||
+ reader_offsets.size() != field_offsets.size()) {
+ return Status::Invalid(
+ "read schema, row offsets and field offsets must have the same
size");
+ }
+ if (readers.size() <= 1) {
+ return Status::Invalid("readers size is supposed to be more than 1");
+ }
+ return std::unique_ptr<DataEvolutionFileReader>(
+ new DataEvolutionFileReader(std::move(readers), read_schema,
read_batch_size,
+ reader_offsets, field_offsets,
GetArrowPool(pool)));
+}
+
+Result<BatchReader::ReadBatchWithBitmap>
DataEvolutionFileReader::NextBatchWithBitmap() {
+ std::vector<std::shared_ptr<arrow::StructArray>> array_for_each_reader;
+ array_for_each_reader.reserve(readers_.size());
+ int64_t array_length = -1;
+ for (size_t i = 0; i < readers_.size(); i++) {
+ if (!readers_[i]) {
+ // no read field from readers_[i]
+ array_for_each_reader.push_back(nullptr);
+ continue;
+ }
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array,
NextBatchForSingleReader(i));
+ if (array == nullptr) {
+ // read eof
+ return BatchReader::MakeEofBatchWithBitmap();
+ }
+ if (array_length == -1) {
+ array_length = array->length();
+ } else if (array_length != array->length()) {
+ return Status::Invalid("array for single reader length mismatch
others");
+ }
+ auto struct_array =
arrow::internal::checked_pointer_cast<arrow::StructArray>(array);
+ assert(struct_array);
+ array_for_each_reader.push_back(struct_array);
+ }
+ int32_t read_field_count = read_schema_->num_fields();
+ arrow::ArrayVector target_sub_array_vec;
+ target_sub_array_vec.reserve(read_field_count);
+ for (int32_t i = 0; i < read_field_count; i++) {
+ if (reader_offsets_[i] == -1) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> null_array,
+ GetOrCreateNonExistArray(i, array_length));
+ target_sub_array_vec.push_back(null_array);
+ continue;
+ }
+ const auto& sub_array = array_for_each_reader[reader_offsets_[i]];
+ assert(sub_array->num_fields() > field_offsets_[i]);
+ target_sub_array_vec.push_back(sub_array->field(field_offsets_[i]));
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<arrow::Array> target_array,
+ arrow::StructArray::Make(target_sub_array_vec,
read_schema_->field_names()));
+ std::unique_ptr<ArrowArray> target_c_arrow_array =
std::make_unique<ArrowArray>();
+ std::unique_ptr<ArrowSchema> target_c_schema =
std::make_unique<ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ arrow::ExportArray(*target_array, target_c_arrow_array.get(),
target_c_schema.get()));
+ auto target_batch = std::make_pair(std::move(target_c_arrow_array),
std::move(target_c_schema));
+ return ReaderUtils::AddAllValidBitmap(std::move(target_batch));
+}
+
+Result<std::shared_ptr<arrow::Array>>
DataEvolutionFileReader::GetOrCreateNonExistArray(
+ int32_t field_idx, int64_t array_length) {
+ if (!non_exist_array_vec_[field_idx] ||
+ non_exist_array_vec_[field_idx]->length() < array_length) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ non_exist_array_vec_[field_idx],
+ arrow::MakeArrayOfNull(read_schema_->field(field_idx)->type(),
array_length,
+ arrow_pool_.get()));
+ }
+ if (non_exist_array_vec_[field_idx]->length() == array_length) {
+ return non_exist_array_vec_[field_idx];
+ }
+ return non_exist_array_vec_[field_idx]->Slice(0, array_length);
+}
+
+int64_t DataEvolutionFileReader::CalculateCachedArrayLength(size_t reader_idx)
const {
+ int64_t total_length = 0;
+ for (const auto& array : cached_array_vec_[reader_idx]) {
+ total_length += array->length();
+ }
+ return total_length;
+}
+
+Result<std::shared_ptr<arrow::Array>>
DataEvolutionFileReader::NextBatchForSingleReader(
+ size_t reader_idx) {
+ int64_t total_array_length = CalculateCachedArrayLength(reader_idx);
+ if (total_array_length >= read_batch_size_) {
+ assert(false);
+ return Status::Invalid(fmt::format(
+ "Unexpected: the length of cached array in last turn {} exceed
read batch size {}",
+ total_array_length, read_batch_size_));
+ }
+ // array left for last turn
+ arrow::ArrayVector concat_array_vec =
std::move(cached_array_vec_[reader_idx]);
+ cached_array_vec_[reader_idx].clear();
+ while (total_array_length < read_batch_size_) {
+ PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap src_array_with_bitmap,
+ readers_[reader_idx]->NextBatchWithBitmap());
+ if (BatchReader::IsEofBatch(src_array_with_bitmap)) {
+ // read finish
+ break;
+ }
+ auto& [read_batch, bitmap] = src_array_with_bitmap;
+ auto& [c_array, c_schema] = read_batch;
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
src_array,
+ arrow::ImportArray(c_array.get(),
c_schema.get()));
+ PAIMON_ASSIGN_OR_RAISE(arrow::ArrayVector selected_array_vec,
+
ReaderUtils::GenerateFilteredArrayVector(src_array, bitmap));
+ for (const auto& selected_array : selected_array_vec) {
+ if (total_array_length + selected_array->length() >
read_batch_size_) {
+ // need truncate current array to align read_batch_size_
+ int64_t truncated_length = read_batch_size_ -
total_array_length;
+ if (truncated_length == 0) {
+ // total_array_length equals to read_batch_size_, all
selected_array left will
+ // be added to cached_array_vec_
+ cached_array_vec_[reader_idx].push_back(selected_array);
+ } else {
+ concat_array_vec.push_back(selected_array->Slice(0,
truncated_length));
+ cached_array_vec_[reader_idx].push_back(
+ selected_array->Slice(truncated_length));
+ total_array_length += truncated_length;
+ }
+ } else {
+ concat_array_vec.push_back(selected_array);
+ total_array_length += selected_array->length();
+ }
+ }
+ }
+ if (concat_array_vec.empty()) {
+ return std::shared_ptr<arrow::Array>();
+ }
+ if (concat_array_vec.size() == 1) {
+ // avoid data copy
+ return concat_array_vec[0];
+ }
+ // TODO(xinyu.lxy) remove data copy for efficiency
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array>
concat_array,
+ arrow::Concatenate(concat_array_vec,
arrow_pool_.get()));
+ assert(concat_array->length() == total_array_length);
+ assert(concat_array->length() <= read_batch_size_);
+ return concat_array;
+}
+
+void DataEvolutionFileReader::Close() {
+ cached_array_vec_.clear();
+ non_exist_array_vec_.clear();
+ for (const auto& reader : readers_) {
+ if (reader) {
+ reader->Close();
+ }
+ }
+}
+
+std::shared_ptr<Metrics> DataEvolutionFileReader::GetReaderMetrics() const {
+ return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_file_reader.h
b/src/paimon/common/reader/data_evolution_file_reader.h
new file mode 100644
index 0000000..e1bea66
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/metrics.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+/// This is a union reader which contains multiple inner readers.
+///
+/// This reader, assembling multiple reader into one big and great reader. The
row it produces
+/// also come from the readers it contains.
+///
+/// For example, the expected schema for this reader is : int, int, string,
int, string, int.(Total
+/// 6 fields) It contains three inner readers, we call them reader0, reader1
and reader2.
+///
+/// The rowOffsets and fieldOffsets are all 6 elements long the same as
+/// output schema. RowOffsets is used to indicate which inner reader the field
comes from, and
+/// fieldOffsets is used to indicate the offset of the field in the inner
reader.
+///
+/// For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0,
0, 1, 1, 1, 0}, it
+/// means:
+/// - The first field comes from reader0, and it is at offset 0 in reader0.
+/// - The second field comes from reader2, and it is at offset 0 in reader2.
+/// - The third field comes from reader0, and it is at offset 1 in reader0.
+/// - The fourth field comes from reader1, and it is at offset 1 in reader1.
+/// - The fifth field comes from reader2, and it is at offset 1 in reader2.
+/// - The sixth field comes from reader1, and it is at offset 0 in reader1.
+///
+/// These three readers work together, package out final and complete rows.
+class DataEvolutionFileReader : public BatchReader {
+ public:
+ static Result<std::unique_ptr<DataEvolutionFileReader>> Create(
+ std::vector<std::unique_ptr<BatchReader>>&& readers,
+ const std::shared_ptr<arrow::Schema>& read_schema, int32_t
read_batch_size,
+ const std::vector<int32_t>& reader_offsets, const
std::vector<int32_t>& field_offsets,
+ const std::shared_ptr<MemoryPool>& pool);
+
+ Result<ReadBatch> NextBatch() override {
+ return Status::Invalid(
+ "paimon inner reader DataEvolutionFileReader should use
NextBatchWithBitmap");
+ }
+
+ Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+ void Close() override;
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+ private:
+ DataEvolutionFileReader(std::vector<std::unique_ptr<BatchReader>>&&
readers,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ int32_t read_batch_size, const
std::vector<int32_t>& reader_offsets,
+ const std::vector<int32_t>& field_offsets,
+ const std::shared_ptr<arrow::MemoryPool>&
arrow_pool)
+ : arrow_pool_(arrow_pool),
+ readers_(std::move(readers)),
+ read_schema_(read_schema),
+ read_batch_size_(read_batch_size),
+ reader_offsets_(reader_offsets),
+ field_offsets_(field_offsets),
+ cached_array_vec_(readers_.size()),
+ non_exist_array_vec_(read_schema->num_fields(), nullptr) {}
+
+ int64_t CalculateCachedArrayLength(size_t reader_idx) const;
+
+ Result<std::shared_ptr<arrow::Array>> NextBatchForSingleReader(size_t
reader_idx);
+
+ Result<std::shared_ptr<arrow::Array>> GetOrCreateNonExistArray(int32_t
field_idx,
+ int64_t
array_length);
+
+ private:
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+ std::vector<std::unique_ptr<BatchReader>> readers_;
+ std::shared_ptr<arrow::Schema> read_schema_;
+ int32_t read_batch_size_;
+ std::vector<int32_t> reader_offsets_;
+ std::vector<int32_t> field_offsets_;
+ std::vector<arrow::ArrayVector> cached_array_vec_;
+ arrow::ArrayVector non_exist_array_vec_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_file_reader_test.cpp
b/src/paimon/common/reader/data_evolution_file_reader_test.cpp
new file mode 100644
index 0000000..39afb15
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_file_reader_test.cpp
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_file_reader.h"
+
+#include <map>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/api.h"
+#include "arrow/util/range.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+
+class DataEvolutionFileReaderTest : public ::testing::Test,
+ public ::testing::WithParamInterface<bool>
{
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ }
+
+ void TearDown() override {
+ pool_.reset();
+ }
+
+ void CheckResult(const arrow::ArrayVector& src_array_vec,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ const std::vector<int32_t>& reader_offsets,
+ const std::vector<int32_t>& field_offsets,
+ const std::shared_ptr<arrow::Array>& expected_array,
+ const std::optional<RoaringBitmap32>& selection_bitmap =
std::nullopt) const {
+ for (auto batch_size : arrow::internal::Iota(1, 10)) {
+ int32_t total_row_count = 0;
+ std::vector<std::unique_ptr<BatchReader>> readers;
+ for (const auto& array : src_array_vec) {
+ if (array == nullptr) {
+ // simulate no fields read from current reader
+ readers.push_back(nullptr);
+ continue;
+ }
+ total_row_count += array->length();
+ std::unique_ptr<MockFileBatchReader> file_batch_reader;
+ if (selection_bitmap) {
+ file_batch_reader = std::make_unique<MockFileBatchReader>(
+ array, array->type(), selection_bitmap.value(),
batch_size);
+ } else {
+ file_batch_reader =
+ std::make_unique<MockFileBatchReader>(array,
array->type(), batch_size);
+ }
+ auto enable_randomize_batch_size = GetParam();
+
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+ readers.push_back(std::move(file_batch_reader));
+ }
+ ASSERT_OK_AND_ASSIGN(
+ auto data_evolution_file_reader,
+ DataEvolutionFileReader::Create(std::move(readers),
read_schema, batch_size,
+ reader_offsets, field_offsets,
pool_));
+ // check metrics, data_evolution_file_reader collects all row of
each
+ // MockFileBatchReader
+ auto metrics = data_evolution_file_reader->GetReaderMetrics();
+ ASSERT_EQ(metrics->ToString(),
+ "{\"mock.number.of.rows\":" +
std::to_string(total_row_count) + "}");
+
+ // check result array
+ ASSERT_OK_AND_ASSIGN(
+ auto result_array,
+
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()));
+ data_evolution_file_reader->Close();
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_array);
+ ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+ }
+ }
+
+ void CheckNextBatchForSingleReader(int32_t inner_batch_size, int32_t
read_batch_size,
+ const std::shared_ptr<arrow::Array>&
src_array,
+ const std::optional<RoaringBitmap32>&
selection_bitmap,
+ const std::shared_ptr<arrow::Array>&
expected_array) const {
+ std::unique_ptr<MockFileBatchReader> file_batch_reader;
+ if (selection_bitmap) {
+ file_batch_reader = std::make_unique<MockFileBatchReader>(
+ src_array, src_array->type(), selection_bitmap.value(),
inner_batch_size);
+ } else {
+ file_batch_reader =
std::make_unique<MockFileBatchReader>(src_array, src_array->type(),
+
inner_batch_size);
+ }
+ auto enable_randomize_batch_size = GetParam();
+
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+ std::vector<std::unique_ptr<BatchReader>> readers;
+ readers.push_back(std::move(file_batch_reader));
+ DataEvolutionFileReader fake_data_evolution_reader(
+ std::move(readers), /*read_schema=*/arrow::schema({}),
read_batch_size,
+ /*reader_offsets=*/{}, /*field_offsets=*/{}, GetArrowPool(pool_));
+ arrow::ArrayVector result_array_vec;
+ while (true) {
+ ASSERT_OK_AND_ASSIGN(auto result_array,
+
fake_data_evolution_reader.NextBatchForSingleReader(0));
+ if (result_array == nullptr) {
+ break;
+ }
+ result_array_vec.push_back(result_array);
+ }
+ ASSERT_EQ(result_array_vec.size(),
+ std::ceil(static_cast<double>(expected_array->length()) /
read_batch_size));
+ // except for last batch, the length each array is expected to be
aligned to read_batch_size
+ for (size_t i = 0; i < result_array_vec.size() - 1; i++) {
+ ASSERT_EQ(result_array_vec[i]->length(), read_batch_size);
+ }
+ if (expected_array->length() % read_batch_size == 0) {
+ ASSERT_EQ(result_array_vec.back()->length(), read_batch_size);
+ } else {
+ ASSERT_EQ(result_array_vec.back()->length(),
+ expected_array->length() % read_batch_size);
+ }
+ auto result_chunk_array =
std::make_shared<arrow::ChunkedArray>(result_array_vec);
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_array);
+ ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array));
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(DataEvolutionFileReaderTest, TestInvalid) {
+ {
+ arrow::FieldVector read_fields;
+ auto read_schema = arrow::schema(read_fields);
+ ASSERT_NOK_WITH_MSG(
+ DataEvolutionFileReader::Create({}, read_schema,
/*read_batch_size=*/10, {}, {}, pool_),
+ "read schema must not be empty");
+ }
+ {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()),
+ arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::utf8()),
+ arrow::field("f3", arrow::int32()),
+ };
+ auto read_schema = arrow::schema(read_fields);
+ std::vector<int32_t> reader_offsets = {0, 0, 1};
+ std::vector<int32_t> field_offsets = {0, 1, 0};
+ ASSERT_NOK_WITH_MSG(DataEvolutionFileReader::Create({}, read_schema,
/*read_batch_size=*/10,
+ reader_offsets,
field_offsets, pool_),
+ "read schema, row offsets and field offsets must
have the same size");
+ }
+ {
+ std::vector<std::unique_ptr<BatchReader>> readers;
+ readers.push_back(nullptr);
+
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()),
+ arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::utf8()),
+ arrow::field("f3", arrow::int32()),
+ };
+ auto read_schema = arrow::schema(read_fields);
+ std::vector<int32_t> reader_offsets = {0, 0, 1, 1};
+ std::vector<int32_t> field_offsets = {0, 1, 1, 0};
+ ASSERT_NOK_WITH_MSG(
+ DataEvolutionFileReader::Create(std::move(readers), read_schema,
/*read_batch_size=*/10,
+ reader_offsets, field_offsets,
pool_),
+ "readers size is supposed to be more than 1");
+ }
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestNextBatchForSingleReader) {
+ auto prepare_array = [](int64_t array_length) ->
std::shared_ptr<arrow::Array> {
+ auto array_builder = std::make_shared<arrow::Int32Builder>();
+ for (int32_t i = 0; i < array_length; ++i) {
+ EXPECT_TRUE(array_builder->Append(i).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(array_builder->Finish(&array).ok());
+ return array;
+ };
+ auto prepare_array_with_bitmap =
+ [](const RoaringBitmap32& bitmap) -> std::shared_ptr<arrow::Array> {
+ auto array_builder = std::make_shared<arrow::Int32Builder>();
+ for (auto iter = bitmap.Begin(); iter != bitmap.End(); ++iter) {
+ EXPECT_TRUE(array_builder->Append(*iter).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(array_builder->Finish(&array).ok());
+ return array;
+ };
+ {
+ // src array length = 10, read batch size = 10
+ auto src_array = prepare_array(10);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 10)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/10, src_array,
+ /*selection_bitmap=*/std::nullopt,
+ /*expected_array=*/src_array);
+ }
+ }
+ {
+ // src array length = 10, read batch size = 6
+ auto src_array = prepare_array(10);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 6)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/6, src_array,
+ /*selection_bitmap=*/std::nullopt,
+ /*expected_array=*/src_array);
+ }
+ }
+ {
+ // src array length = 10, read batch size = 15
+ auto src_array = prepare_array(10);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ /*selection_bitmap=*/std::nullopt,
+ /*expected_array=*/src_array);
+ }
+ }
+ {
+ // test bulk data, src array length = 10000, read batch size = 1024
+ auto src_array = prepare_array(10000);
+ for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/1024, src_array,
+ /*selection_bitmap=*/std::nullopt,
+ /*expected_array=*/src_array);
+ }
+ }
+ {
+ // src array length = 10, selection bitmap = {1, 3, 5}
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({1, 3, 5});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ selected_bitmap, expected_array);
+ }
+ }
+ {
+ // src array length = 10, selection all
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3,
4, 5, 6, 7, 8, 9});
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ selected_bitmap,
/*expected_array=*/src_array);
+ }
+ }
+ {
+ // src array length = 10, selection first
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ selected_bitmap, expected_array);
+ }
+ }
+ {
+ // src array length = 10, selection first
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({9});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ selected_bitmap, expected_array);
+ }
+ }
+ {
+ // src array length = 10, selection consecutive positions
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({2, 3, 4, 5});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ for (int32_t inner_batch_size : arrow::internal::Iota(1, 15)) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/15, src_array,
+ selected_bitmap, expected_array);
+ }
+ }
+ {
+ auto src_array = prepare_array(10);
+ RoaringBitmap32 selected_bitmap = RoaringBitmap32::From({0, 1, 2, 3,
4, 5, 7, 8, 9});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ // inner batch: [0, 1, 2, 3] | [4, 5] [7, 8] | [9]
+ CheckNextBatchForSingleReader(/*inner_batch_size=*/4,
/*read_batch_size=*/5, src_array,
+ selected_bitmap, expected_array);
+ }
+ {
+ // test bulk data, src array length = 10000, read batch size = 1024
+ auto src_array = prepare_array(10000);
+ RoaringBitmap32 selected_bitmap =
+ RoaringBitmap32::From({0, 10, 1000, 2333, 4566, 7838, 8787, 9999});
+ auto expected_array = prepare_array_with_bitmap(selected_bitmap);
+ for (int32_t inner_batch_size : {1, 2, 8, 16, 20, 100, 1024}) {
+ CheckNextBatchForSingleReader(inner_batch_size,
/*read_batch_size=*/1024, src_array,
+ selected_bitmap, expected_array);
+ }
+ }
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestSimple) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
+ arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
+ };
+ auto read_schema = arrow::schema(read_fields);
+
+ std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[0], read_fields[2]}), R"([
+ [0, "00"],
+ [1, "01"],
+ [2, "02"],
+ [3, "03"],
+ [4, "04"],
+ [5, "05"]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[5], read_fields[3]}), R"([
+ [10, 110],
+ [11, 111],
+ [12, 112],
+ [13, 113],
+ [14, 114],
+ [15, 115]
+])")
+ .ValueOrDie();
+ auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[4]}), R"([
+ [20, "20"],
+ [21, "21"],
+ [22, "22"],
+ [23, "23"],
+ [24, "24"],
+ [25, "25"]
+])")
+ .ValueOrDie();
+
+ auto expected_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields),
R"([
+ [0, 20, "00", 110, "20", 10],
+ [1, 21, "01", 111, "21", 11],
+ [2, 22, "02", 112, "22", 12],
+ [3, 23, "03", 113, "23", 13],
+ [4, 24, "04", 114, "24", 14],
+ [5, 25, "05", 115, "25", 15]
+])")
+ .ValueOrDie();
+ CheckResult({array0, array1, array2}, read_schema, reader_offsets,
field_offsets,
+ expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestWithNonExistField) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()), arrow::field("f1",
arrow::int32()),
+ arrow::field("f2", arrow::utf8()), arrow::field("f3",
arrow::int32()),
+ arrow::field("f4", arrow::utf8()), arrow::field("f5",
arrow::int32()),
+ arrow::field("non-field", arrow::int32()),
+ };
+ auto read_schema = arrow::schema(read_fields);
+
+ std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
+
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[0], read_fields[2]}), R"([
+ [0, "00"],
+ [1, "01"],
+ [2, "02"],
+ [3, "03"],
+ [4, "04"],
+ [5, "05"]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[5], read_fields[3]}), R"([
+ [10, 110],
+ [11, 111],
+ [12, 112],
+ [13, 113],
+ [14, 114],
+ [15, 115]
+])")
+ .ValueOrDie();
+ auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[4]}), R"([
+ [20, "20"],
+ [21, "21"],
+ [22, "22"],
+ [23, "23"],
+ [24, "24"],
+ [25, "25"]
+])")
+ .ValueOrDie();
+
+ auto expected_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields),
R"([
+ [0, 20, "00", 110, "20", 10, null],
+ [1, 21, "01", 111, "21", 11, null],
+ [2, 22, "02", 112, "22", 12, null],
+ [3, 23, "03", 113, "23", 13, null],
+ [4, 24, "04", 114, "24", 14, null],
+ [5, 25, "05", 115, "25", 15, null]
+])")
+ .ValueOrDie();
+ CheckResult({array0, array1, array2}, read_schema, reader_offsets,
field_offsets,
+ expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestReadFromPartialReaders) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32()),
+ arrow::field("f4", arrow::utf8()), arrow::field("f5", arrow::int32()),
+ };
+ auto read_schema = arrow::schema(read_fields);
+ // simulate reader2 has no field to read
+ std::vector<int32_t> reader_offsets = {0, 3, 0, 1, 3, 1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[0], read_fields[2]}), R"([
+ [0, "00"],
+ [1, "01"],
+ [2, "02"],
+ [3, "03"],
+ [4, "04"],
+ [5, "05"]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[5], read_fields[3]}), R"([
+ [10, 110],
+ [11, 111],
+ [12, 112],
+ [13, 113],
+ [14, 114],
+ [15, 115]
+])")
+ .ValueOrDie();
+ auto array3 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[4]}), R"([
+ [20, "20"],
+ [21, "21"],
+ [22, "22"],
+ [23, "23"],
+ [24, "24"],
+ [25, "25"]
+])")
+ .ValueOrDie();
+
+ auto expected_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields),
R"([
+ [0, 20, "00", 110, "20", 10],
+ [1, 21, "01", 111, "21", 11],
+ [2, 22, "02", 112, "22", 12],
+ [3, 23, "03", 113, "23", 13],
+ [4, 24, "04", 114, "24", 14],
+ [5, 25, "05", 115, "25", 15]
+])")
+ .ValueOrDie();
+
+ CheckResult({array0, array1, nullptr, array3}, read_schema,
reader_offsets, field_offsets,
+ expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestNestedType) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f1", arrow::map(arrow::int8(), arrow::int16())),
+ arrow::field("f2", arrow::list(arrow::float32())),
+ arrow::field("f3", arrow::struct_({arrow::field("f0",
arrow::boolean()),
+ arrow::field("f1",
arrow::int64())})),
+ arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f5", arrow::date32()),
+ arrow::field("f6", arrow::decimal128(2, 2))};
+ auto read_schema = arrow::schema(read_fields);
+
+ std::vector<int32_t> reader_offsets = {0, 1, 0, 1, 0, 1};
+ std::vector<int32_t> field_offsets = {2, 0, 1, 1, 0, 2};
+
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[4], read_fields[2],
read_fields[0]}), R"([
+ [2456, [true, 2], [[0, 0]]],
+ [24, [true, 1], [[0, 1]]],
+ [2456, [false, 12], [[10, 10]]],
+ [245, [false, 2222], [[127, 32767], [-128, -32768]]],
+ [24, [true, 2], [[1, 64], [2, 32]]],
+ [24, [true, 2], [[11, 64], [12, 32]]]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[3],
read_fields[5]}), R"([
+ [[0.1, 0.2], "1970-01-01 00:02:03.123123", "0.22"],
+ [[0.1, 0.3], "1970-01-01 00:02:03.999999", "0.28"],
+ [[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.22"],
+ [[1.1, 1.2], "1970-01-01 00:02:03.123123", "0.12"],
+ [[2.2, 3.2], "1970-01-01 00:00:00.0", "0.78"],
+ [[2.2, 3.2], "1970-01-01 00:00:00.123123", "0.78"]
+])")
+ .ValueOrDie();
+
+ auto expected_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields),
R"([
+ [[[0, 0]], [0.1, 0.2], [true, 2], "1970-01-01 00:02:03.123123", 2456,
"0.22"],
+ [[[0, 1]], [0.1, 0.3], [true, 1], "1970-01-01 00:02:03.999999", 24,
"0.28"],
+ [[[10, 10]], [1.1, 1.2], [false, 12], "1970-01-01 00:02:03.123123",
2456, "0.22"],
+ [[[127, 32767], [-128, -32768]], [1.1, 1.2], [false, 2222],
"1970-01-01 00:02:03.123123", 245, "0.12"],
+ [[[1, 64], [2, 32]], [2.2, 3.2], [true, 2], "1970-01-01 00:00:00.0",
24, "0.78"],
+ [[[11, 64], [12, 32]], [2.2, 3.2], [true, 2], "1970-01-01
00:00:00.123123", 24, "0.78"]
+])")
+ .ValueOrDie();
+ CheckResult({array0, array1}, read_schema, reader_offsets, field_offsets,
expected_array);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestWithBitmap) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()), arrow::field("f1",
arrow::int32()),
+ arrow::field("f2", arrow::utf8()), arrow::field("f3",
arrow::int32()),
+ arrow::field("f4", arrow::utf8()), arrow::field("f5",
arrow::int32()),
+ arrow::field("non-exist", arrow::int32())};
+ auto read_schema = arrow::schema(read_fields);
+
+ std::vector<int32_t> reader_offsets = {0, 2, 0, 1, 2, 1, -1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1};
+
+ RoaringBitmap32 selection_bitmap = RoaringBitmap32::From({1, 3, 5});
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[0], read_fields[2]}), R"([
+ [0, "00"],
+ [1, "01"],
+ [2, "02"],
+ [3, "03"],
+ [4, "04"],
+ [5, "05"]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[5], read_fields[3]}), R"([
+ [10, 110],
+ [11, 111],
+ [12, 112],
+ [13, 113],
+ [14, 114],
+ [15, 115]
+])")
+ .ValueOrDie();
+ auto array2 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[4]}), R"([
+ [20, "20"],
+ [21, "21"],
+ [22, "22"],
+ [23, "23"],
+ [24, "24"],
+ [25, "25"]
+])")
+ .ValueOrDie();
+
+ auto expected_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(read_fields),
R"([
+ [1, 21, "01", 111, "21", 11, null],
+ [3, 23, "03", 113, "23", 13, null],
+ [5, 25, "05", 115, "25", 15, null]
+])")
+ .ValueOrDie();
+ CheckResult({array0, array1, array2}, read_schema, reader_offsets,
field_offsets,
+ expected_array, selection_bitmap);
+}
+
+TEST_P(DataEvolutionFileReaderTest, TestSingleReaderRowCountMismatch) {
+ arrow::FieldVector read_fields = {
+ arrow::field("f0", arrow::int32()), arrow::field("f1", arrow::int32()),
+ arrow::field("f2", arrow::utf8()), arrow::field("f3", arrow::int32())};
+ auto read_schema = arrow::schema(read_fields);
+
+ std::vector<int32_t> reader_offsets = {0, 1, 0, 1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1};
+
+ auto array0 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[0], read_fields[2]}), R"([
+ [0, "00"],
+ [1, "01"],
+ [2, "02"],
+ [3, "03"],
+ [4, "04"],
+ [5, "05"]
+])")
+ .ValueOrDie();
+ auto array1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::struct_({read_fields[1], read_fields[3]}), R"([
+ [10, 110],
+ [11, 111],
+ [12, 112],
+ [13, 113],
+ [14, 114]
+])")
+ .ValueOrDie();
+ std::vector<std::unique_ptr<BatchReader>> readers;
+ for (const auto& array : {array0, array1}) {
+ auto file_batch_reader =
+ std::make_unique<MockFileBatchReader>(array, array->type(),
/*read_batch_size=*/10);
+ auto enable_randomize_batch_size = GetParam();
+
file_batch_reader->EnableRandomizeBatchSize(enable_randomize_batch_size);
+ readers.push_back(std::move(file_batch_reader));
+ }
+ ASSERT_OK_AND_ASSIGN(
+ auto data_evolution_file_reader,
+ DataEvolutionFileReader::Create(std::move(readers), read_schema,
/*read_batch_size=*/10,
+ reader_offsets, field_offsets, pool_));
+ // array0 has 6 rows but array1 only has 5 rows
+ ASSERT_NOK_WITH_MSG(
+
paimon::test::ReadResultCollector::CollectResult(data_evolution_file_reader.get()),
+ "array for single reader length mismatch others");
+}
+
+INSTANTIATE_TEST_SUITE_P(EnableRandomizeBatchSize, DataEvolutionFileReaderTest,
+ ::testing::ValuesIn({true, false}));
+
+} // namespace paimon::test
diff --git a/src/paimon/common/reader/data_evolution_row.h
b/src/paimon/common/reader/data_evolution_row.h
new file mode 100644
index 0000000..93393c0
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_row.h
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+class RowKind;
+
+/// The row which is made up by several rows.
+class DataEvolutionRow : public InternalRow {
+ public:
+ DataEvolutionRow(const std::vector<BinaryRow>& rows, const
std::vector<int32_t>& row_offsets,
+ const std::vector<int32_t>& field_offsets)
+ : rows_(rows), row_offsets_(row_offsets),
field_offsets_(field_offsets) {
+ assert(!rows_.empty());
+ }
+
+ Result<const RowKind*> GetRowKind() const override {
+ if (!row_kind_) {
+ PAIMON_ASSIGN_OR_RAISE(row_kind_, rows_[0].GetRowKind());
+ }
+ return row_kind_;
+ }
+
+ void SetRowKind(const RowKind* kind) override {
+ row_kind_ = kind;
+ }
+
+ int32_t GetFieldCount() const override {
+ return field_offsets_.size();
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ if (row_offsets_[pos] < 0) {
+ return true;
+ }
+ return rows_[row_offsets_[pos]].IsNullAt(field_offsets_[pos]);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetBoolean(field_offsets_[pos]);
+ }
+
+ char GetByte(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetByte(field_offsets_[pos]);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetShort(field_offsets_[pos]);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetInt(field_offsets_[pos]);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetDate(field_offsets_[pos]);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetLong(field_offsets_[pos]);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetFloat(field_offsets_[pos]);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetDouble(field_offsets_[pos]);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetString(field_offsets_[pos]);
+ }
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetBinary(field_offsets_[pos]);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetStringView(field_offsets_[pos]);
+ }
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+ return rows_[row_offsets_[pos]].GetTimestamp(field_offsets_[pos],
precision);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override {
+ return rows_[row_offsets_[pos]].GetDecimal(field_offsets_[pos],
precision, scale);
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override {
+ return rows_[row_offsets_[pos]].GetRow(field_offsets_[pos],
num_fields);
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetArray(field_offsets_[pos]);
+ }
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+ return rows_[row_offsets_[pos]].GetMap(field_offsets_[pos]);
+ }
+
+ std::string ToString() const override {
+ return "DataEvolutionRow";
+ }
+
+ private:
+ mutable const RowKind* row_kind_ = nullptr;
+ std::vector<BinaryRow> rows_;
+ std::vector<int32_t> row_offsets_;
+ std::vector<int32_t> field_offsets_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/reader/data_evolution_row_test.cpp
b/src/paimon/common/reader/data_evolution_row_test.cpp
new file mode 100644
index 0000000..47b7f54
--- /dev/null
+++ b/src/paimon/common/reader/data_evolution_row_test.cpp
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/data_evolution_row.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <variant>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DataEvolutionRowTest, TestSimple) {
+ // f0:int32
+ // f1:int32
+ // f2:string
+ // f3:int32
+ // f4:string
+ // f5:int32
+ std::vector<int32_t> row_offsets = {0, 2, 0, 1, 2, 1};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0};
+
+ auto pool = GetDefaultPool();
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({0, std::string("00")},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({10, 110}, pool.get());
+ BinaryRow row3 = BinaryRowGenerator::GenerateRow({20, std::string("20")},
pool.get());
+
+ DataEvolutionRow row({row1, row2, row3}, row_offsets, field_offsets);
+ ASSERT_EQ(row.GetFieldCount(), 6);
+ ASSERT_FALSE(row.IsNullAt(0));
+ ASSERT_EQ(row.GetInt(0), 0);
+ ASSERT_EQ(row.GetInt(1), 20);
+ ASSERT_EQ(row.GetString(2).ToString(), "00");
+ ASSERT_EQ(row.GetInt(3), 110);
+ ASSERT_EQ(row.GetString(4).ToString(), "20");
+ ASSERT_EQ(row.GetInt(5), 10);
+
+ // test set and get row kind
+ ASSERT_OK_AND_ASSIGN(const RowKind* row_kind, row.GetRowKind());
+ ASSERT_EQ(*row_kind, *RowKind::Insert());
+ row.SetRowKind(RowKind::UpdateAfter());
+ ASSERT_OK_AND_ASSIGN(const RowKind* new_row_kind, row.GetRowKind());
+ ASSERT_EQ(*new_row_kind, *RowKind::UpdateAfter());
+ ASSERT_EQ(row.ToString(), "DataEvolutionRow");
+}
+
+TEST(DataEvolutionRowTest, TestNull) {
+ // f0:int32
+ // f1:int32
+ // f2:string
+ // f3:int32
+ // f4:string
+ // f5:int32
+ // f6:non-exist
+ std::vector<int32_t> row_offsets = {0, 2, 0, 1, 2, 1, -1, -2};
+ std::vector<int32_t> field_offsets = {0, 0, 1, 1, 1, 0, -1, -1};
+
+ auto pool = GetDefaultPool();
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({0, std::string("00")},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({10, 110}, pool.get());
+ BinaryRow row3 = BinaryRowGenerator::GenerateRow({20, NullType()},
pool.get());
+
+ DataEvolutionRow row({row1, row2, row3}, row_offsets, field_offsets);
+ ASSERT_EQ(row.GetFieldCount(), 8);
+ ASSERT_FALSE(row.IsNullAt(0));
+ ASSERT_EQ(row.GetInt(0), 0);
+ ASSERT_EQ(row.GetInt(1), 20);
+ ASSERT_EQ(row.GetString(2).ToString(), "00");
+ ASSERT_EQ(row.GetInt(3), 110);
+ ASSERT_TRUE(row.IsNullAt(4));
+ ASSERT_EQ(row.GetInt(5), 10);
+ ASSERT_TRUE(row.IsNullAt(6));
+ ASSERT_TRUE(row.IsNullAt(7));
+}
+
+TEST(DataEvolutionRowTest, TestAllFieldTypes) {
+ auto pool = GetDefaultPool();
+
+ // Row0: boolean(true), byte(42), short(1024), int(100000), date(19000)
+ BinaryRow row0 =
+ BinaryRowGenerator::GenerateRow({true, static_cast<int8_t>(42),
static_cast<int16_t>(1024),
+ static_cast<int32_t>(100000),
static_cast<int32_t>(19000)},
+ pool.get());
+
+ // Row1: long(-987654321), float(3.14f), double(-9.87654321)
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(-987654321LL), static_cast<float>(3.14f),
+ static_cast<double>(-9.87654321)},
+ pool.get());
+
+ // Row2: string("hello"), string("world")
+ BinaryRow row2 =
+ BinaryRowGenerator::GenerateRow({std::string("hello"),
std::string("world")}, pool.get());
+
+ // Row3: decimal(10,2,67890), timestamp(1000,500) with precision 9
+ BinaryRow row3 = BinaryRowGenerator::GenerateRow(
+ {Decimal(10, 2, 67890), TimestampType(Timestamp(1000, 500), 9)},
pool.get());
+
+ // Row4: binary("fghij")
+ auto bytes_val = std::make_shared<Bytes>("fghij", pool.get());
+ BinaryRow row4 = BinaryRowGenerator::GenerateRow({bytes_val}, pool.get());
+
+ // Row5: nested array [10,20], nested map {1:100, 2:200}, nested row
("Alice", 30)
+ // Built manually via BinaryRowWriter since BinaryRowGenerator doesn't
support nested types
+ BinaryRow row5(3);
+ {
+ BinaryRowWriter writer(&row5, 0, pool.get());
+
+ // field 0: array [10, 20]
+ auto inner_array = BinaryArray::FromIntArray({10, 20}, pool.get());
+ writer.WriteArray(0, inner_array);
+
+ // field 1: map {1->100, 2->200}
+ auto map_key = BinaryArray::FromIntArray({1, 2}, pool.get());
+ auto map_val = BinaryArray::FromLongArray({100LL, 200LL}, pool.get());
+ auto map_obj = BinaryMap::ValueOf(map_key, map_val, pool.get());
+ writer.WriteMap(1, *map_obj);
+
+ // field 2: row ("Alice", 30)
+ BinaryRow inner_row = BinaryRowGenerator::GenerateRow(
+ {std::string("Alice"), static_cast<int32_t>(30)}, pool.get());
+ writer.WriteRow(2, inner_row);
+
+ writer.Complete();
+ }
+
+ // Build the DataEvolutionRow that picks specific fields from each row.
+ // evolution field -> (row_index, field_index_within_row)
+ // pos 0: boolean -> row0[0] = true
+ // pos 1: byte -> row0[1] = 42
+ // pos 2: short -> row0[2] = 1024
+ // pos 3: int -> row0[3] = 100000
+ // pos 4: date -> row0[4] = 19000
+ // pos 5: long -> row1[0] = -987654321
+ // pos 6: float -> row1[1] = 3.14f
+ // pos 7: double -> row1[2] = -9.87654321
+ // pos 8: string -> row2[0] = "hello"
+ // pos 9: stringview-> row2[1] = "world"
+ // pos 10: decimal -> row3[0] = Decimal(10,2,67890)
+ // pos 11: timestamp -> row3[1] = Timestamp(1000,500)
+ // pos 12: binary -> row4[0] = "fghij"
+ // pos 13: array -> row5[0] = [10,20]
+ // pos 14: map -> row5[1] = {1:100, 2:200}
+ // pos 15: row -> row5[2] = ("Alice", 30)
+ std::vector<int32_t> row_offsets = {0, 0, 0, 0, 0, 1, 1, 1, 2, 2, 3, 3, 4,
5, 5, 5};
+ std::vector<int32_t> field_offsets = {0, 1, 2, 3, 4, 0, 1, 2, 0, 1, 0, 1,
0, 0, 1, 2};
+
+ DataEvolutionRow evolution({row0, row1, row2, row3, row4, row5},
row_offsets, field_offsets);
+
+ ASSERT_EQ(evolution.GetFieldCount(), 16);
+
+ // Boolean
+ ASSERT_FALSE(evolution.IsNullAt(0));
+ ASSERT_EQ(evolution.GetBoolean(0), true);
+
+ // Byte
+ ASSERT_EQ(evolution.GetByte(1), 42);
+
+ // Short
+ ASSERT_EQ(evolution.GetShort(2), 1024);
+
+ // Int
+ ASSERT_EQ(evolution.GetInt(3), 100000);
+
+ // Date
+ ASSERT_EQ(evolution.GetDate(4), 19000);
+
+ // Long
+ ASSERT_EQ(evolution.GetLong(5), -987654321LL);
+
+ // Float
+ ASSERT_FLOAT_EQ(evolution.GetFloat(6), 3.14f);
+
+ // Double
+ ASSERT_DOUBLE_EQ(evolution.GetDouble(7), -9.87654321);
+
+ // String
+ ASSERT_EQ(evolution.GetString(8).ToString(), "hello");
+
+ // StringView
+ ASSERT_EQ(evolution.GetStringView(9), "world");
+
+ // Decimal
+ ASSERT_EQ(evolution.GetDecimal(10, 10, 2), Decimal(10, 2, 67890));
+
+ // Timestamp
+ ASSERT_EQ(evolution.GetTimestamp(11, 9), Timestamp(1000, 500));
+
+ // Binary
+ auto retrieved_binary = evolution.GetBinary(12);
+ Bytes expected_binary("fghij", pool.get());
+ ASSERT_EQ(*retrieved_binary, expected_binary);
+
+ // Nested Array
+ auto retrieved_array = evolution.GetArray(13);
+ ASSERT_OK_AND_ASSIGN(auto inner_values, retrieved_array->ToIntArray());
+ ASSERT_EQ(inner_values, std::vector<int32_t>({10, 20}));
+
+ // Map
+ auto retrieved_map = evolution.GetMap(14);
+ ASSERT_EQ(retrieved_map->Size(), 2);
+
+ // Nested Row
+ auto retrieved_row = evolution.GetRow(15, 2);
+ ASSERT_EQ(retrieved_row->GetString(0).ToString(), "Alice");
+ ASSERT_EQ(retrieved_row->GetInt(1), 30);
+
+ // ToString
+ ASSERT_EQ(evolution.ToString(), "DataEvolutionRow");
+}
+
+} // namespace paimon::test