This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1966faf feat: migrate common/data/columnar module (#24)
1966faf is described below
commit 1966faf5ddbaaee06a403b842f19870573189d8d
Author: lxy <[email protected]>
AuthorDate: Fri May 29 10:36:13 2026 +0800
feat: migrate common/data/columnar module (#24)
Squash merge PR #24.
---
src/paimon/common/data/columnar/columnar_array.cpp | 160 ++++++++
src/paimon/common/data/columnar/columnar_array.h | 154 +++++++
.../common/data/columnar/columnar_array_test.cpp | 312 ++++++++++++++
.../common/data/columnar/columnar_batch_context.h | 40 ++
src/paimon/common/data/columnar/columnar_map.cpp | 45 ++
src/paimon/common/data/columnar/columnar_map.h | 53 +++
src/paimon/common/data/columnar/columnar_row.cpp | 89 ++++
src/paimon/common/data/columnar/columnar_row.h | 160 ++++++++
.../common/data/columnar/columnar_row_ref.cpp | 87 ++++
src/paimon/common/data/columnar/columnar_row_ref.h | 136 ++++++
.../common/data/columnar/columnar_row_test.cpp | 455 +++++++++++++++++++++
src/paimon/common/data/columnar/columnar_utils.h | 116 ++++++
.../common/data/columnar/columnar_utils_test.cpp | 56 +++
13 files changed, 1863 insertions(+)
diff --git a/src/paimon/common/data/columnar/columnar_array.cpp
b/src/paimon/common/data/columnar/columnar_array.cpp
new file mode 100644
index 0000000..7a96106
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array.cpp
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_array.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Status ColumnarArray::CheckNoNull() const {
+ for (int32_t i = 0; i < length_; i++) {
+ if (IsNullAt(i)) {
+ return Status::Invalid(fmt::format("row {} is null", i));
+ }
+ }
+ return Status::OK();
+}
+
+Decimal ColumnarArray::GetDecimal(int32_t pos, int32_t precision, int32_t
scale) const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+ auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
+ assert(array);
+ arrow::Decimal128 decimal(array->GetValue(offset_ + pos));
+ return Decimal(
+ precision, scale,
+ static_cast<Decimal::int128_t>(
+
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) <<
64 |
+ decimal.low_bits()));
+}
+
+Timestamp ColumnarArray::GetTimestamp(int32_t pos, int32_t precision) const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+ auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
+ assert(array);
+ int64_t data = array->Value(offset_ + pos);
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+ // for orc format, data is saved as nano, therefore, Timestamp convert
should consider precision
+ // in arrow array rather than input precision
+ DateTimeUtils::TimeType time_type =
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+ auto [milli, nano] = DateTimeUtils::TimestampConverter(
+ data, time_type, DateTimeUtils::TimeType::MILLISECOND,
DateTimeUtils::TimeType::NANOSECOND);
+ return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalArray> ColumnarArray::GetArray(int32_t pos) const {
+ auto list_array = arrow::internal::checked_cast<const
arrow::ListArray*>(array_);
+ assert(list_array);
+ int32_t offset = list_array->value_offset(offset_ + pos);
+ int32_t length = list_array->value_length(offset_ + pos);
+ return std::make_shared<ColumnarArray>(list_array->values().get(), pool_,
offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
+ auto map_array = arrow::internal::checked_cast<const
arrow::MapArray*>(array_);
+ assert(map_array);
+ int32_t offset = map_array->value_offset(offset_ + pos);
+ int32_t length = map_array->value_length(offset_ + pos);
+ return std::make_shared<ColumnarMap>(map_array->keys(),
map_array->items(), pool_, offset,
+ length);
+}
+
+std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t
num_fields) const {
+ auto struct_array = arrow::internal::checked_cast<const
arrow::StructArray*>(array_);
+ assert(struct_array);
+ auto row_ctx =
std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
+ return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
+}
+
+Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<char> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ bool element = GetBoolean(i);
+ res[i] = element ? static_cast<char>(1) : static_cast<char>(0);
+ }
+ return res;
+}
+
+Result<std::vector<char>> ColumnarArray::ToByteArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<char> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetByte(i);
+ }
+ return res;
+}
+
+Result<std::vector<int16_t>> ColumnarArray::ToShortArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<int16_t> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetShort(i);
+ }
+ return res;
+}
+
+Result<std::vector<int32_t>> ColumnarArray::ToIntArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<int32_t> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetInt(i);
+ }
+ return res;
+}
+
+Result<std::vector<int64_t>> ColumnarArray::ToLongArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<int64_t> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetLong(i);
+ }
+ return res;
+}
+
+Result<std::vector<float>> ColumnarArray::ToFloatArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<float> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetFloat(i);
+ }
+ return res;
+}
+
+Result<std::vector<double>> ColumnarArray::ToDoubleArray() const {
+ PAIMON_RETURN_NOT_OK(CheckNoNull());
+ std::vector<double> res(length_);
+ for (int32_t i = 0; i < length_; i++) {
+ res[i] = GetDouble(i);
+ }
+ return res;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_array.h
b/src/paimon/common/data/columnar/columnar_array.h
new file mode 100644
index 0000000..773e46f
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array.h
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class BinaryType;
+class BooleanType;
+class Date32Type;
+class DoubleType;
+class FloatType;
+class Int16Type;
+class Int32Type;
+class Int64Type;
+class Int8Type;
+class StringType;
+} // namespace arrow
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Columnar array to support access to vector column data.
+///
+/// NOTE: This class holds a non-owning raw pointer to the underlying
arrow::Array for efficiency.
+/// The caller must ensure that the pointed-to Array outlives this
ColumnarArray instance.
+/// Typically, lifetime is guaranteed by the owning ColumnarBatchContext or
the parent
+/// arrow container (e.g., ListArray, MapArray) that holds the shared_ptr.
+class ColumnarArray : public InternalArray {
+ public:
+ ColumnarArray(const arrow::Array* array, const
std::shared_ptr<MemoryPool>& pool,
+ int32_t offset, int32_t length)
+ : pool_(pool), array_(array), offset_(offset), length_(length) {
+ assert(array_);
+ assert(array_->length() >= offset + length);
+ }
+
+ int32_t Size() const override {
+ return length_;
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ return array_->IsNull(offset_ + pos);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::BooleanType,
bool>(array_, offset_ + pos);
+ }
+
+ char GetByte(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(array_,
offset_ + pos);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int16Type,
int16_t>(array_, offset_ + pos);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int32Type,
int32_t>(array_, offset_ + pos);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Date32Type,
int32_t>(array_, offset_ + pos);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int64Type,
int64_t>(array_, offset_ + pos);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(array_,
offset_ + pos);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::DoubleType,
double>(array_, offset_ + pos);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(array_,
offset_ + pos, pool_.get());
+ return BinaryString::FromBytes(bytes);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ return ColumnarUtils::GetView(array_, offset_ + pos);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override;
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ return ColumnarUtils::GetBytes<arrow::BinaryType>(array_, offset_ +
pos, pool_.get());
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override;
+
+ Result<std::vector<char>> ToBooleanArray() const override;
+
+ Result<std::vector<char>> ToByteArray() const override;
+
+ Result<std::vector<int16_t>> ToShortArray() const override;
+
+ Result<std::vector<int32_t>> ToIntArray() const override;
+
+ Result<std::vector<int64_t>> ToLongArray() const override;
+
+ Result<std::vector<float>> ToFloatArray() const override;
+
+ Result<std::vector<double>> ToDoubleArray() const override;
+
+ private:
+ Status CheckNoNull() const;
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ const arrow::Array* array_;
+ int32_t offset_;
+ int32_t length_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_array_test.cpp
b/src/paimon/common/data/columnar/columnar_array_test.cpp
new file mode 100644
index 0000000..650d7eb
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array_test.cpp
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_array.h"
+
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/util/checked_cast.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ColumnarArrayTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ {
+ auto f1 =
+ arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::boolean()), "[[true, false], [true],
[false], [false, true]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/2, 1);
+ ASSERT_EQ(array.Size(), 1);
+ ASSERT_EQ(array.GetBoolean(0), true);
+ std::vector<char> expected_array = {static_cast<char>(1)};
+ ASSERT_EQ(array.ToBooleanArray().value(), expected_array);
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int8()),
+ "[[1, 1, 2], [3],
[2], [2]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/5, 1);
+ ASSERT_EQ(array.GetByte(0), 2);
+ std::vector<char> expected_array = {static_cast<char>(2)};
+ ASSERT_EQ(array.ToByteArray().value(), expected_array);
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int16()),
+ "[[1, 1, 2], [3],
[2], [-4]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 3);
+ ASSERT_EQ(array.GetShort(0), 1);
+ ASSERT_EQ(array.GetShort(1), 1);
+ ASSERT_EQ(array.GetShort(2), 2);
+ std::vector<int16_t> expected_array = {1, 1, 2};
+ ASSERT_EQ(array.ToShortArray().value(), expected_array);
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int32()),
+ "[[1, 1, 2], [3],
[2], [-4]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/3, 1);
+ ASSERT_EQ(array.GetInt(0), 3);
+ std::vector<int32_t> expected_array = {3};
+ ASSERT_EQ(array.ToIntArray().value(), expected_array);
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+ "[[1, 1, 2], [3],
[2], [-4]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/4, 1);
+ ASSERT_EQ(array.GetLong(0), 2);
+ std::vector<int64_t> expected_array = {2};
+ ASSERT_EQ(array.ToLongArray().value(), expected_array);
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+ "[[1, 1, 2], [3],
[null], null]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/4, 1);
+ ASSERT_NOK_WITH_MSG(array.ToLongArray(), "is null");
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::float32()), "[[0.0, 1.1, 2.2], [3.3],
[4.4], [5.5]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 3);
+ ASSERT_NEAR(array.GetFloat(1), 1.1, 0.001);
+ std::vector<float> expected_array = {0.0, 1.1, 2.2};
+ ASSERT_EQ(array.ToFloatArray().value(), expected_array);
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::float64()), "[[0.0, 1.1, 2.2], [3.3],
[4.4], [5.5]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/3, 1);
+ ASSERT_NEAR(array.GetDouble(0), 3.3, 0.001);
+ std::vector<double> expected_array = {3.3};
+ ASSERT_EQ(array.ToDoubleArray().value(), expected_array);
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::utf8()), R"([["abc", "def"], ["efg"],
["hello"], ["hi"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/4, 1);
+ ASSERT_EQ(array.GetString(0).ToString(), "hi");
+ ASSERT_EQ(std::string(array.GetStringView(0)), "hi");
+ }
+}
+
+TEST(ColumnarArrayTest, TestComplexAndNestedType) {
+ auto pool = GetDefaultPool();
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::date32()),
+ "[[1, 1, 2], [3],
[2], [-4]]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/3, 1);
+ ASSERT_EQ(array.GetDate(0), 3);
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::decimal128(10, 3)),
+ R"([["1.234", "1234.000"], ["-9876.543"], ["666.888"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 2);
+ ASSERT_EQ(array.GetDecimal(0, 10, 3), Decimal(10, 3, 1234));
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)),
+ R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 1);
+ auto ts = array.GetTimestamp(0, 9);
+ ASSERT_EQ(ts, Timestamp(59000, 0));
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::binary()),
+ R"([["aaa", "bb"],
["ccc"], ["bbb"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 2);
+ ASSERT_EQ(*array.GetBinary(1), Bytes("bb", pool.get()));
+ ASSERT_EQ(std::string(array.GetStringView(1)), "bb");
+ }
+ {
+ auto f1 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::struct_({
+ field("sub1",
arrow::int64()),
+ field("sub2",
arrow::int64()),
+ field("sub3",
arrow::int64()),
+ field("sub4",
arrow::int64()),
+ })),
+ R"([
+ [[1, 3, 2, 5],
+ [2, 2, 1, 3]],
+ [[3, 2, 1, 3]],
+ [[4, 1, 0, 2]]
+ ])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 2);
+ auto result_row = array.GetRow(1, 4);
+ ASSERT_EQ(result_row->GetLong(0), 2);
+ ASSERT_EQ(result_row->GetLong(1), 2);
+ ASSERT_EQ(result_row->GetLong(2), 1);
+ ASSERT_EQ(result_row->GetLong(3), 3);
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::list(arrow::int64())), "[[[1, 2, 3],
[4, 5, 6]], []]")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 1);
+ auto result_array = array.GetArray(0);
+ auto inner_result_array = array.GetArray(0);
+ std::vector<int64_t> values = {1, 2, 3};
+ ASSERT_EQ(inner_result_array->ToLongArray().value(), values);
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::map(arrow::int32(), arrow::int64())),
+ R"([
+ [[[1, 3], [4, 4]]], []
+ ])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ ASSERT_TRUE(list_array);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/0, 1);
+ auto result_key = array.GetMap(0)->KeyArray();
+ auto result_value = array.GetMap(0)->ValueArray();
+ ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1,
4}));
+ ASSERT_EQ(result_value->ToLongArray().value(),
std::vector<int64_t>({3, 4}));
+ }
+}
+TEST(ColumnarArrayTest, TestTimestampType) {
+ auto pool = GetDefaultPool();
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)),
+ R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 0);
+ ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI)),
+ R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 3);
+ ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO)),
+
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 6);
+ ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)),
+
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 9);
+ ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND,
timezone)),
+ R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 0);
+ ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI,
timezone)),
+ R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 3);
+ ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO,
timezone)),
+
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 6);
+ ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond();
+ }
+ {
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::list(arrow::timestamp(arrow::TimeUnit::NANO,
timezone)),
+
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001",
+ "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+ .ValueOrDie();
+ auto list_array =
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+ auto array = ColumnarArray(list_array->values().get(), pool,
/*offset=*/1, 2);
+ auto ts = array.GetTimestamp(0, 9);
+ ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond();
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h
b/src/paimon/common/data/columnar/columnar_batch_context.h
new file mode 100644
index 0000000..2d35c0d
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_batch_context.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+
+namespace arrow {
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+struct ColumnarBatchContext {
+ ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
+ const std::shared_ptr<MemoryPool>& pool_in)
+ : pool(pool_in), array_vec(array_vec_in) {}
+
+ std::shared_ptr<MemoryPool> pool;
+ arrow::ArrayVector array_vec;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_map.cpp
b/src/paimon/common/data/columnar/columnar_map.cpp
new file mode 100644
index 0000000..13351ad
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_map.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_map.h"
+
+#include "paimon/common/data/columnar/columnar_array.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+ColumnarMap::ColumnarMap(const std::shared_ptr<arrow::Array>& key_array,
+ const std::shared_ptr<arrow::Array>& value_array,
+ const std::shared_ptr<MemoryPool>& pool, int32_t
offset, int32_t length)
+ : pool_(pool),
+ key_array_(key_array),
+ value_array_(value_array),
+ offset_(offset),
+ length_(length) {}
+
+std::shared_ptr<InternalArray> ColumnarMap::KeyArray() const {
+ return std::make_shared<ColumnarArray>(key_array_.get(), pool_, offset_,
length_);
+}
+std::shared_ptr<InternalArray> ColumnarMap::ValueArray() const {
+ return std::make_shared<ColumnarArray>(value_array_.get(), pool_, offset_,
length_);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_map.h
b/src/paimon/common/data/columnar/columnar_map.h
new file mode 100644
index 0000000..549a75a
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_map.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+/// Columnar map to support access to vector column data.
+class ColumnarMap : public InternalMap {
+ public:
+ ColumnarMap(const std::shared_ptr<arrow::Array>& key_array,
+ const std::shared_ptr<arrow::Array>& value_array,
+ const std::shared_ptr<MemoryPool>& pool, int32_t offset,
int32_t length);
+
+ int32_t Size() const override {
+ return length_;
+ }
+ std::shared_ptr<InternalArray> KeyArray() const override;
+ std::shared_ptr<InternalArray> ValueArray() const override;
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<arrow::Array> key_array_;
+ std::shared_ptr<arrow::Array> value_array_;
+ int32_t offset_;
+ int32_t length_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row.cpp
b/src/paimon/common/data/columnar/columnar_row.cpp
new file mode 100644
index 0000000..bc7b150
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row.h"
+
+#include <cassert>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "paimon/common/data/columnar/columnar_array.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Decimal ColumnarRow::GetDecimal(int32_t pos, int32_t precision, int32_t scale)
const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+ auto array = arrow::internal::checked_cast<const
ArrayType*>(array_vec_[pos]);
+ assert(array);
+ arrow::Decimal128 decimal(array->GetValue(row_id_));
+ return Decimal(
+ precision, scale,
+ static_cast<Decimal::int128_t>(
+
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) <<
64 |
+ decimal.low_bits()));
+}
+
+Timestamp ColumnarRow::GetTimestamp(int32_t pos, int32_t precision) const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+ auto array = arrow::internal::checked_cast<const
ArrayType*>(array_vec_[pos]);
+ assert(array);
+ int64_t data = array->Value(row_id_);
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+ // for orc format, data is saved as nano, therefore, Timestamp convert
should consider precision
+ // in arrow array rather than input precision
+ DateTimeUtils::TimeType time_type =
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+ auto [milli, nano] = DateTimeUtils::TimestampConverter(
+ data, time_type, DateTimeUtils::TimeType::MILLISECOND,
DateTimeUtils::TimeType::NANOSECOND);
+ return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalRow> ColumnarRow::GetRow(int32_t pos, int32_t
num_fields) const {
+ auto struct_array = arrow::internal::checked_cast<const
arrow::StructArray*>(array_vec_[pos]);
+ assert(struct_array);
+ // NOTE: For performance, the returned nested row does NOT hold shared
ownership of the parent
+ // StructArray. Callers must ensure the parent ColumnarRow (or its
underlying RecordBatch)
+ // outlives the returned row to avoid dangling pointers.
+ return std::make_shared<ColumnarRow>(struct_array->fields(), pool_,
row_id_);
+}
+
+std::shared_ptr<InternalArray> ColumnarRow::GetArray(int32_t pos) const {
+ auto list_array = arrow::internal::checked_cast<const
arrow::ListArray*>(array_vec_[pos]);
+ assert(list_array);
+ int32_t offset = list_array->value_offset(row_id_);
+ int32_t length = list_array->value_length(row_id_);
+ return std::make_shared<ColumnarArray>(list_array->values().get(), pool_,
offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarRow::GetMap(int32_t pos) const {
+ auto map_array = arrow::internal::checked_cast<const
arrow::MapArray*>(array_vec_[pos]);
+ assert(map_array);
+ int32_t offset = map_array->value_offset(row_id_);
+ int32_t length = map_array->value_length(row_id_);
+ return std::make_shared<ColumnarMap>(map_array->keys(),
map_array->items(), pool_, offset,
+ length);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row.h
b/src/paimon/common/data/columnar/columnar_row.h
new file mode 100644
index 0000000..2156c81
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row.h
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class StructArray;
+} // namespace arrow
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Columnar row to support access to vector column data. It is a row view in
arrow::Array.
+class ColumnarRow : public InternalRow {
+ public:
+ /// @brief Construct a ColumnarRow without holding ownership of the
underlying arrays.
+ /// @warning The caller MUST ensure the data source (e.g., RecordBatch or
parent StructArray)
+ /// outlives this ColumnarRow. The internal array_vec_ stores raw pointers
only; if the
+ /// source is freed first, these pointers will dangle. This design is
intentional for
+ /// performance—avoiding per-row shared_ptr ref-count overhead on the hot
read path.
+ ColumnarRow(const arrow::ArrayVector& array_vec, const
std::shared_ptr<MemoryPool>& pool,
+ int64_t row_id)
+ : ColumnarRow(/*struct_array holder*/ nullptr, array_vec, pool,
row_id) {}
+
+ /// @brief Construct a ColumnarRow that holds shared ownership of a
StructArray.
+ /// @note When struct_array is non-null it keeps the underlying buffers
alive, making it safe
+ /// to outlive the original batch. Prefer this overload when the row may
escape the scope of
+ /// its parent container.
+ ColumnarRow(const std::shared_ptr<arrow::StructArray>& struct_array,
+ const arrow::ArrayVector& array_vec, const
std::shared_ptr<MemoryPool>& pool,
+ int64_t row_id)
+ : struct_array_(struct_array), pool_(pool), row_id_(row_id) {
+ array_vec_.reserve(array_vec.size());
+ for (const auto& array : array_vec) {
+ array_vec_.push_back(array.get());
+ }
+ }
+
+ Result<const RowKind*> GetRowKind() const override {
+ return row_kind_;
+ }
+
+ void SetRowKind(const RowKind* kind) override {
+ row_kind_ = kind;
+ }
+
+ int32_t GetFieldCount() const override {
+ return array_vec_.size();
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ return array_vec_[pos]->IsNull(row_id_);
+ }
+ bool GetBoolean(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::BooleanType,
bool>(array_vec_[pos], row_id_);
+ }
+
+ char GetByte(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int8Type,
char>(array_vec_[pos], row_id_);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int16Type,
int16_t>(array_vec_[pos], row_id_);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int32Type,
int32_t>(array_vec_[pos], row_id_);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Date32Type,
int32_t>(array_vec_[pos], row_id_);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int64Type,
int64_t>(array_vec_[pos], row_id_);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::FloatType,
float>(array_vec_[pos], row_id_);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::DoubleType,
double>(array_vec_[pos], row_id_);
+ }
+
+ /// @note `GetString()` and `GetBinary()` will deep copy string data to
pool, use
+ /// `GetStringView()` to avoid copy
+ BinaryString GetString(int32_t pos) const override {
+ auto bytes =
+ ColumnarUtils::GetBytes<arrow::StringType>(array_vec_[pos],
row_id_, pool_.get());
+ return BinaryString::FromBytes(bytes);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ return ColumnarUtils::GetView(array_vec_[pos], row_id_);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override;
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ return ColumnarUtils::GetBytes<arrow::BinaryType>(array_vec_[pos],
row_id_, pool_.get());
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override;
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+ std::string ToString() const override {
+ return fmt::format("ColumnarRow, row_id {}", row_id_);
+ }
+
+ private:
+ /// @note `struct_array_` is the data holder for columnar row, ensure that
the data life
+ /// cycle is consistent with the columnar row, `array_vec_` maybe a subset
of
+ /// `struct_array_`, so `struct_array_` cannot be used for `GetXXX()`
+ std::shared_ptr<arrow::StructArray> struct_array_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::vector<const arrow::Array*> array_vec_;
+ const RowKind* row_kind_ = RowKind::Insert();
+ int64_t row_id_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp
b/src/paimon/common/data/columnar/columnar_row_ref.cpp
new file mode 100644
index 0000000..9aaba46
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+
+#include <cassert>
+
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "paimon/common/data/columnar/columnar_array.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t
scale) const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+ auto array = arrow::internal::checked_cast<const
ArrayType*>(ctx_->array_vec[pos].get());
+ assert(array);
+ arrow::Decimal128 decimal(array->GetValue(row_id_));
+ return Decimal(
+ precision, scale,
+ static_cast<Decimal::int128_t>(
+
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) <<
64 |
+ decimal.low_bits()));
+}
+
+Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
+ using ArrayType = typename
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+ auto array = arrow::internal::checked_cast<const
ArrayType*>(ctx_->array_vec[pos].get());
+ assert(array);
+ int64_t data = array->Value(row_id_);
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+ // for orc format, data is saved as nano, therefore, Timestamp convert
should consider precision
+ // in arrow array rather than input precision
+ DateTimeUtils::TimeType time_type =
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+ auto [milli, nano] = DateTimeUtils::TimestampConverter(
+ data, time_type, DateTimeUtils::TimeType::MILLISECOND,
DateTimeUtils::TimeType::NANOSECOND);
+ return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t
num_fields) const {
+ auto struct_array =
+ arrow::internal::checked_cast<const
arrow::StructArray*>(ctx_->array_vec[pos].get());
+ assert(struct_array);
+ auto nested_ctx =
std::make_shared<ColumnarBatchContext>(struct_array->fields(), ctx_->pool);
+ return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
+}
+
+std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
+ auto list_array =
+ arrow::internal::checked_cast<const
arrow::ListArray*>(ctx_->array_vec[pos].get());
+ assert(list_array);
+ int32_t offset = list_array->value_offset(row_id_);
+ int32_t length = list_array->value_length(row_id_);
+ return std::make_shared<ColumnarArray>(list_array->values().get(),
ctx_->pool, offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
+ auto map_array =
+ arrow::internal::checked_cast<const
arrow::MapArray*>(ctx_->array_vec[pos].get());
+ assert(map_array);
+ int32_t offset = map_array->value_offset(row_id_);
+ int32_t length = map_array->value_length(row_id_);
+ return std::make_shared<ColumnarMap>(map_array->keys(),
map_array->items(), ctx_->pool, offset,
+ length);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h
b/src/paimon/common/data/columnar/columnar_row_ref.h
new file mode 100644
index 0000000..b08fdf6
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_ref.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+
+/// Columnar row view which shares batch-level context to reduce per-row
overhead.
+class ColumnarRowRef : public InternalRow {
+ public:
+ ColumnarRowRef(std::shared_ptr<ColumnarBatchContext> ctx, int64_t row_id)
+ : ctx_(std::move(ctx)), row_id_(row_id) {}
+
+ Result<const RowKind*> GetRowKind() const override {
+ return row_kind_;
+ }
+
+ void SetRowKind(const RowKind* kind) override {
+ row_kind_ = kind;
+ }
+
+ int32_t GetFieldCount() const override {
+ return static_cast<int32_t>(ctx_->array_vec.size());
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ return ctx_->array_vec[pos]->IsNull(row_id_);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::BooleanType,
bool>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ char GetByte(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int8Type,
char>(ctx_->array_vec[pos].get(),
+ row_id_);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int16Type,
int16_t>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int32Type,
int32_t>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
+ ctx_->array_vec[pos].get(), row_id_);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::Int64Type,
int64_t>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::FloatType,
float>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ return ColumnarUtils::GetGenericValue<arrow::DoubleType,
double>(ctx_->array_vec[pos].get(),
+
row_id_);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ auto bytes =
ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_vec[pos].get(), row_id_,
+
ctx_->pool.get());
+ return BinaryString::FromBytes(bytes);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override;
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ return
ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_vec[pos].get(), row_id_,
+ ctx_->pool.get());
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override;
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+ std::string ToString() const override {
+ return fmt::format("ColumnarRowRef, row_id {}", row_id_);
+ }
+
+ private:
+ std::shared_ptr<ColumnarBatchContext> ctx_;
+ const RowKind* row_kind_ = RowKind::Insert();
+ int64_t row_id_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_test.cpp
b/src/paimon/common/data/columnar/columnar_row_test.cpp
new file mode 100644
index 0000000..6301cd2
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_test.cpp
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(ColumnarRowTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type =
+ arrow::struct_({arrow::field("f1", arrow::boolean()),
arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()), arrow::field("f4",
arrow::int32()),
+ arrow::field("f5", arrow::int64()), arrow::field("f6",
arrow::float32()),
+ arrow::field("f7", arrow::float64()),
arrow::field("f8", arrow::utf8())});
+ auto f1 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([true,
false, false, true])")
+ .ValueOrDie();
+ auto f2 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([0, 1, 2,
3])").ValueOrDie();
+ auto f3 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int16(), R"([4, 5, 6,
7])").ValueOrDie();
+ auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(),
R"([10, 11, 12, 13])")
+ .ValueOrDie();
+ auto f5 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(),
R"([15, 16, 17, 18])")
+ .ValueOrDie();
+ auto f6 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float32(),
R"([0.0, 1.1, 2.2, 3.3])")
+ .ValueOrDie();
+ auto f7 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float64(),
R"([5.5, 6.6, 7.7, 8.8])")
+ .ValueOrDie();
+ auto f8 = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
+ R"(["Hello", "World",
"HELLO", "WORLD"])")
+ .ValueOrDie();
+ auto data = arrow::StructArray::Make({f1, f2, f3, f4, f5, f6, f7, f8},
target_type->fields())
+ .ValueOrDie();
+
+ auto row = ColumnarRow(data->fields(), pool, 0);
+ ASSERT_EQ(row.GetFieldCount(), 8);
+ ASSERT_EQ(row.GetBoolean(0), true);
+ ASSERT_EQ(row.GetByte(1), 0);
+ ASSERT_EQ(row.GetShort(2), 4);
+ ASSERT_EQ(row.GetInt(3), 10);
+ ASSERT_EQ(row.GetLong(4), 15);
+ ASSERT_EQ(row.GetFloat(5), 0.0);
+ ASSERT_EQ(row.GetDouble(6), 5.5);
+ ASSERT_EQ(row.GetString(7).ToString(), "Hello");
+ ASSERT_EQ(std::string(row.GetStringView(7)), "Hello");
+}
+
+TEST(ColumnarRowRefTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type =
+ arrow::struct_({arrow::field("f1", arrow::int32()), arrow::field("f2",
arrow::utf8())});
+ auto f1 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2,
3])").ValueOrDie();
+ auto f2 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["alpha",
"beta", "gamma"])")
+ .ValueOrDie();
+ auto data = arrow::StructArray::Make({f1, f2},
target_type->fields()).ValueOrDie();
+
+ auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+ ColumnarRowRef row(ctx, 1);
+ ASSERT_EQ(row.GetFieldCount(), 2);
+ ASSERT_EQ(row.GetInt(0), 2);
+ ASSERT_EQ(std::string(row.GetStringView(1)), "beta");
+
+ auto row_kind = row.GetRowKind();
+ ASSERT_TRUE(row_kind.ok());
+ ASSERT_EQ(row_kind.value(), RowKind::Insert());
+ row.SetRowKind(RowKind::Delete());
+ auto updated_kind = row.GetRowKind();
+ ASSERT_TRUE(updated_kind.ok());
+ ASSERT_EQ(updated_kind.value(), RowKind::Delete());
+}
+
+TEST(ColumnarRowTest, TestComplexAndNestedType) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type = arrow::struct_({
+ arrow::field("f0", arrow::date32()),
+ arrow::field("f1", arrow::decimal128(10, 3)),
+ arrow::field("f2", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f3", arrow::binary()),
+ arrow::field(
+ "f4", arrow::struct_({field("sub1", arrow::int64()), field("sub2",
arrow::int64()),
+ field("sub3", arrow::int64()), field("sub4",
arrow::int64())})),
+ arrow::field("f5", arrow::list(arrow::int64())),
+ arrow::field("f6", arrow::map(arrow::int32(), arrow::int64())),
+ arrow::field("f7", arrow::map(arrow::int32(),
arrow::list(arrow::int64()))),
+ arrow::field("f8", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("f9", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("f10", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ });
+ auto f0 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::date32(), R"([109,
1000, -1000, 555])")
+ .ValueOrDie();
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::decimal128(10, 3), R"(["1.234", "1234.000",
"-9876.543", "666.888"])")
+ .ValueOrDie();
+ auto f2 =
+
arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::NANO),
+
R"(["1970-01-01T00:00:59","2000-02-29T23:23:23",
+ "1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+ .ValueOrDie();
+ auto f3 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["aaa",
"bb", "ccc", "bbb"])")
+ .ValueOrDie();
+ auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({
+ field("sub1",
arrow::int64()),
+ field("sub2",
arrow::int64()),
+ field("sub3",
arrow::int64()),
+ field("sub4",
arrow::int64()),
+ }),
+ R"([
+ [1, 3, 2, 5],
+ [2, 2, 1, 3],
+ [3, 2, 1, 3],
+ [4, 1, 0, 2]
+ ])")
+ .ValueOrDie();
+ auto f5 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+ "[[1, 1, 2], [3], [2],
[-4]]")
+ .ValueOrDie();
+ auto f6 =
arrow::ipc::internal::json::ArrayFromJSON(arrow::map(arrow::int32(),
arrow::int64()),
+ R"([[[1, 3], [4, 4]],
+ [[1, 5], [7, 6],
[100, 7]],
+ [[0, 9]],
+ null])")
+ .ValueOrDie();
+ auto f7 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::map(arrow::int32(), arrow::list(arrow::int64())),
+ R"([[[1, [10, 20]], [4, [40, 50, 100]]],
+ [[1, [1, 2]], [7, [6]], [100, [8]]],
+ [[0, [9]]],
+ null])")
+ .ValueOrDie();
+ auto f8 =
+
arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::SECOND),
+
R"(["1970-01-01T00:00:59","2000-02-29T23:23:23",
+
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+ .ValueOrDie();
+ auto f9 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::timestamp(arrow::TimeUnit::MILLI),
+ R"(["1970-01-01T00:00:59.001","2000-02-29T23:23:23",
+
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+ .ValueOrDie();
+ auto f10 = arrow::ipc::internal::json::ArrayFromJSON(
+ arrow::timestamp(arrow::TimeUnit::MICRO),
+ R"(["1970-01-01T00:00:59.000001","2000-02-29T23:23:23",
+
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+ .ValueOrDie();
+
+ auto data = arrow::StructArray::Make({f0, f1, f2, f3, f4, f5, f6, f7, f8,
f9, f10},
+ target_type->fields())
+ .ValueOrDie();
+
+ auto row = ColumnarRow(data->fields(), pool, 0);
+ ASSERT_EQ(row.GetFieldCount(), 11);
+ ASSERT_EQ(row.GetDate(0), 109);
+ ASSERT_EQ(row.GetDecimal(1, 10, 3), Decimal(10, 3, 1234));
+
+ auto ts = row.GetTimestamp(2, /*precision=*/9);
+ ASSERT_EQ(ts, Timestamp(59000, 0));
+
+ ASSERT_EQ(*row.GetBinary(3), Bytes("aaa", pool.get()));
+ ASSERT_EQ(std::string(row.GetStringView(3)), "aaa");
+
+ auto result_row = row.GetRow(4, 4);
+ ASSERT_EQ(result_row->GetLong(0), 1);
+ ASSERT_EQ(result_row->GetLong(1), 3);
+ ASSERT_EQ(result_row->GetLong(2), 2);
+ ASSERT_EQ(result_row->GetLong(3), 5);
+
+ std::vector<int64_t> values = {1, 1, 2};
+ auto result_array = row.GetArray(5);
+ ASSERT_EQ(result_array->ToLongArray().value(), values);
+
+ auto result_key = row.GetMap(6)->KeyArray();
+ auto result_value = row.GetMap(6)->ValueArray();
+ ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1, 4}));
+ ASSERT_EQ(result_value->ToLongArray().value(), std::vector<int64_t>({3,
4}));
+
+ result_key = row.GetMap(7)->KeyArray();
+ result_value = row.GetMap(7)->ValueArray();
+
+ ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1, 4}));
+ ASSERT_EQ(2, result_value->Size());
+ ASSERT_EQ(result_value->GetArray(0)->ToLongArray().value(),
std::vector<int64_t>({10, 20}));
+ ASSERT_EQ(result_value->GetArray(1)->ToLongArray().value(),
+ std::vector<int64_t>({40, 50, 100}));
+
+ auto ts_second = row.GetTimestamp(8, /*precision=*/0);
+ ASSERT_EQ(ts_second, Timestamp(59000, 0));
+
+ auto ts_milli = row.GetTimestamp(9, /*precision=*/3);
+ ASSERT_EQ(ts_milli, Timestamp(59001, 0));
+
+ auto ts_micro = row.GetTimestamp(10, /*precision=*/6);
+ ASSERT_EQ(ts_micro, Timestamp(59000, 1000));
+}
+
+TEST(ColumnarRowTest, TestTimestampType) {
+ auto pool = GetDefaultPool();
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND,
timezone)),
+ arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI,
timezone)),
+ arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO,
timezone)),
+ arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO,
timezone)),
+ };
+ auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02",
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01
00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004",
"1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01
00:00:06", null, "1970-01-01 00:00:00.000006", null]
+ ])")
+ .ValueOrDie());
+ {
+ auto row = ColumnarRow(array->fields(), pool, 0);
+ ASSERT_EQ(row.GetFieldCount(), 8);
+ ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(1000, 0));
+ ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(1, 0));
+ ASSERT_EQ(row.GetTimestamp(2, /*precision=*/6), Timestamp(0, 1000));
+ ASSERT_EQ(row.GetTimestamp(3, /*precision=*/9), Timestamp(0, 1));
+ ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(2000, 0));
+ ASSERT_EQ(row.GetTimestamp(5, /*precision=*/3), Timestamp(2, 0));
+ ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 2000));
+ ASSERT_EQ(row.GetTimestamp(7, /*precision=*/9), Timestamp(0, 2));
+ }
+ {
+ auto row = ColumnarRow(array->fields(), pool, 2);
+ ASSERT_EQ(row.GetFieldCount(), 8);
+ ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(5000, 0));
+ ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(5, 0));
+ ASSERT_TRUE(row.IsNullAt(2));
+ ASSERT_TRUE(row.IsNullAt(3));
+ ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(6000, 0));
+ ASSERT_TRUE(row.IsNullAt(5));
+ ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 6000));
+ }
+}
+
+TEST(ColumnarRowTest, TestNull) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type =
+ arrow::struct_({arrow::field("f1", arrow::boolean()),
arrow::field("f2", arrow::int8())});
+ auto f1 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([null,
false, false, true])")
+ .ValueOrDie();
+ auto f2 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([null, 1,
2, 3])").ValueOrDie();
+ auto data = arrow::StructArray::Make({f1, f2},
target_type->fields()).ValueOrDie();
+
+ auto row = ColumnarRow(data->fields(), pool, 0);
+ row.SetRowKind(RowKind::Insert());
+ ASSERT_EQ(row.GetFieldCount(), 2);
+ ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert());
+ ASSERT_TRUE(row.IsNullAt(0));
+ ASSERT_TRUE(row.IsNullAt(1));
+}
+
+TEST(ColumnarRowTest, TestDictionary) {
+ auto pool = GetDefaultPool();
+ auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
R"(["foo", "bar", "baz"])")
+ .ValueOrDie();
+ auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8());
+ auto indices =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0,
2, 0]").ValueOrDie();
+ std::shared_ptr<arrow::DictionaryArray> dict_array =
+ std::make_shared<arrow::DictionaryArray>(dict_type, indices, dict);
+
+ auto f0 = arrow::field("f0", arrow::list(dict_type));
+ auto f1 = arrow::field("f1", arrow::struct_({arrow::field("sub1",
arrow::int64()),
+ arrow::field("sub2",
arrow::binary()),
+ arrow::field("sub3",
dict_type)}));
+
+ auto list_offsets =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([0,
5])").ValueOrDie();
+ auto f0_array = arrow::ListArray::FromArrays(*list_offsets,
*dict_array).ValueOrDie();
+
+ auto sub1_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(),
R"([1])").ValueOrDie();
+ auto sub2_array =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(),
R"(["apple"])").ValueOrDie();
+ auto sub3_array =
+ std::make_shared<arrow::DictionaryArray>(dict_type, indices->Slice(0,
1), dict);
+ auto f1_array = std::make_shared<arrow::StructArray>(
+ f1->type(), /*length=*/1, arrow::ArrayVector({sub1_array, sub2_array,
sub3_array}));
+
+ auto struct_type = arrow::struct_({f0, f1});
+ // data: [["bar", "baz", "foo", "baz", "foo"], [1, "apple", "bar"]]
+ auto data = std::make_shared<arrow::StructArray>(struct_type, /*length=*/1,
+
arrow::ArrayVector({f0_array, f1_array}));
+
+ auto row = ColumnarRow(data->fields(), pool, 0);
+ ASSERT_FALSE(row.IsNullAt(0));
+ auto internal_array = row.GetArray(0);
+ ASSERT_TRUE(internal_array);
+ ASSERT_EQ(5, internal_array->Size());
+ ASSERT_EQ("bar", internal_array->GetString(0).ToString());
+ ASSERT_EQ("baz", internal_array->GetString(1).ToString());
+ ASSERT_EQ("foo", internal_array->GetString(2).ToString());
+ ASSERT_EQ("baz", internal_array->GetString(3).ToString());
+ ASSERT_EQ("foo", internal_array->GetString(4).ToString());
+
+ ASSERT_EQ("bar", std::string(internal_array->GetStringView(0)));
+ ASSERT_EQ("baz", std::string(internal_array->GetStringView(1)));
+ ASSERT_EQ("foo", std::string(internal_array->GetStringView(2)));
+ ASSERT_EQ("baz", std::string(internal_array->GetStringView(3)));
+ ASSERT_EQ("foo", std::string(internal_array->GetStringView(4)));
+
+ ASSERT_FALSE(row.IsNullAt(1));
+ auto internal_row = row.GetRow(1, 3);
+ ASSERT_TRUE(internal_row);
+ ASSERT_EQ(1, internal_row->GetLong(0));
+ auto bytes = internal_row->GetBinary(1);
+ ASSERT_EQ("apple", std::string(bytes->data(), bytes->size()));
+ ASSERT_EQ("apple", std::string(internal_row->GetStringView(1)));
+ ASSERT_EQ("bar", internal_row->GetString(2).ToString());
+ ASSERT_EQ("bar", std::string(internal_row->GetStringView(2)));
+}
+
+TEST(ColumnarRowTest, TestDataLifeCycle) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type =
arrow::struct_({arrow::field(
+ "f0", arrow::struct_({field("sub1", arrow::int64()), field("sub2",
arrow::int64()),
+ field("sub3", arrow::int64()), field("sub4",
arrow::int64())}))});
+ auto f0 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({
+ field("sub1",
arrow::int64()),
+ field("sub2",
arrow::int64()),
+ field("sub3",
arrow::int64()),
+ field("sub4",
arrow::int64()),
+ }),
+ R"([
+ [1, 3, 2, 5],
+ [2, 2, 1, 3],
+ [3, 2, 1, 3],
+ [4, 1, 0, 2]
+ ])")
+ .ValueOrDie();
+ auto data = arrow::StructArray::Make({f0},
target_type->fields()).ValueOrDie();
+ auto row = std::make_unique<ColumnarRow>(data, data->fields(), pool, 0);
+ data.reset();
+ f0.reset();
+ // array data is only held by columnar row
+ ASSERT_EQ(1, row->struct_array_.use_count());
+
+ ASSERT_EQ(row->GetFieldCount(), 1);
+ ASSERT_FALSE(row->IsNullAt(0));
+ auto result_row = row->GetRow(0, 4);
+
+ ASSERT_FALSE(result_row->IsNullAt(0));
+ ASSERT_EQ(result_row->GetLong(0), 1);
+ ASSERT_FALSE(result_row->IsNullAt(1));
+ ASSERT_EQ(result_row->GetLong(1), 3);
+ ASSERT_FALSE(result_row->IsNullAt(2));
+ ASSERT_EQ(result_row->GetLong(2), 2);
+ ASSERT_FALSE(result_row->IsNullAt(3));
+ ASSERT_EQ(result_row->GetLong(3), 5);
+}
+
+TEST(ColumnarRowTest, TestColumnarRowRefGetBinary) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type = arrow::struct_({
+ arrow::field("f0", arrow::binary()),
+ arrow::field("f1", arrow::binary()),
+ });
+ auto f0 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(),
R"(["hello", "world", null])")
+ .ValueOrDie();
+ auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(),
R"(["abc", "", "xyz"])")
+ .ValueOrDie();
+ auto data = arrow::StructArray::Make({f0, f1},
target_type->fields()).ValueOrDie();
+
+ auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+
+ {
+ ColumnarRowRef row(ctx, 0);
+ auto binary = row.GetBinary(0);
+ ASSERT_TRUE(binary);
+ ASSERT_EQ(std::string(binary->data(), binary->size()), "hello");
+
+ auto binary1 = row.GetBinary(1);
+ ASSERT_TRUE(binary1);
+ ASSERT_EQ(std::string(binary1->data(), binary1->size()), "abc");
+ }
+ {
+ ColumnarRowRef row(ctx, 1);
+ auto binary = row.GetBinary(0);
+ ASSERT_TRUE(binary);
+ ASSERT_EQ(std::string(binary->data(), binary->size()), "world");
+
+ auto binary1 = row.GetBinary(1);
+ ASSERT_TRUE(binary1);
+ ASSERT_EQ(binary1->size(), 0);
+ }
+ {
+ ColumnarRowRef row(ctx, 2);
+ ASSERT_TRUE(row.IsNullAt(0));
+
+ auto binary1 = row.GetBinary(1);
+ ASSERT_TRUE(binary1);
+ ASSERT_EQ(std::string(binary1->data(), binary1->size()), "xyz");
+ }
+}
+
+TEST(ColumnarRowTest, TestColumnarRowRefToString) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<arrow::DataType> target_type =
+ arrow::struct_({arrow::field("f0", arrow::int32())});
+ auto f0 =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2,
3])").ValueOrDie();
+ auto data = arrow::StructArray::Make({f0},
target_type->fields()).ValueOrDie();
+
+ auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+
+ {
+ ColumnarRowRef row(ctx, 0);
+ ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 0");
+ }
+ {
+ ColumnarRowRef row(ctx, 2);
+ ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 2");
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/data/columnar/columnar_utils.h
b/src/paimon/common/data/columnar/columnar_utils.h
new file mode 100644
index 0000000..ca9f58e
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_utils.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string_view>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+class ColumnarUtils {
+ public:
+ ColumnarUtils() = delete;
+ ~ColumnarUtils() = delete;
+
+ template <typename DataType, typename ValueType>
+ static ValueType GetGenericValue(const arrow::Array* array, int32_t pos) {
+ using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+ const auto* typed_array = arrow::internal::checked_cast<const
ArrayType*>(array);
+ assert(typed_array);
+ return typed_array->Value(pos);
+ }
+
+ static std::string_view GetView(const arrow::Array* array, int32_t pos) {
+ auto type_id = array->type_id();
+ bool is_dict = (type_id == arrow::Type::type::DICTIONARY);
+ if (!is_dict) {
+ const auto* typed_array =
+ arrow::internal::checked_cast<const
arrow::BinaryArray*>(array);
+ assert(typed_array);
+ return typed_array->GetView(pos);
+ } else {
+ const auto* typed_array =
+ arrow::internal::checked_cast<const
arrow::DictionaryArray*>(array);
+ assert(typed_array);
+ auto dict_type =
+
arrow::internal::checked_pointer_cast<arrow::DictionaryType>(array->type());
+ assert(dict_type);
+ auto value_type_id = dict_type->value_type()->id();
+ auto index_type_id = dict_type->index_type()->id();
+ int64_t dict_index = -1;
+ if (index_type_id == arrow::Type::type::INT8) {
+ auto indices =
+
arrow::internal::checked_cast<arrow::Int8Array*>(typed_array->indices().get());
+ assert(indices);
+ dict_index = indices->Value(pos);
+ } else if (index_type_id == arrow::Type::type::INT16) {
+ auto indices =
+
arrow::internal::checked_cast<arrow::Int16Array*>(typed_array->indices().get());
+ assert(indices);
+ dict_index = indices->Value(pos);
+ } else if (index_type_id == arrow::Type::type::INT32) {
+ auto indices =
+
arrow::internal::checked_cast<arrow::Int32Array*>(typed_array->indices().get());
+ assert(indices);
+ dict_index = indices->Value(pos);
+ } else if (index_type_id == arrow::Type::type::INT64) {
+ auto indices =
+
arrow::internal::checked_cast<arrow::Int64Array*>(typed_array->indices().get());
+ assert(indices);
+ dict_index = indices->Value(pos);
+ }
+ assert(dict_index >= 0);
+ if (value_type_id == arrow::Type::type::STRING) {
+ auto dictionary =
arrow::internal::checked_cast<arrow::StringArray*>(
+ typed_array->dictionary().get());
+ assert(dictionary);
+ return dictionary->GetView(dict_index);
+ } else if (value_type_id == arrow::Type::type::LARGE_STRING) {
+ auto dictionary =
arrow::internal::checked_cast<arrow::LargeStringArray*>(
+ typed_array->dictionary().get());
+ assert(dictionary);
+ return dictionary->GetView(dict_index);
+ }
+ assert(false);
+ return std::string_view();
+ }
+ }
+
+ template <typename DataType>
+ static std::shared_ptr<Bytes> GetBytes(const arrow::Array* array, int32_t
pos,
+ MemoryPool* pool) {
+ auto view = GetView(array, pos);
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(view.size(), pool);
+ memcpy(bytes->data(), view.data(), view.size());
+ return bytes;
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_utils_test.cpp
b/src/paimon/common/data/columnar/columnar_utils_test.cpp
new file mode 100644
index 0000000..7eb8187
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_utils_test.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_utils.h"
+
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(ColumnarUtilsTest, TestGetViewAndBytes) {
+ auto pool = GetDefaultPool();
+ auto array = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
R"(["abc", "def", "hi"])")
+ .ValueOrDie();
+ std::string_view view = ColumnarUtils::GetView(array.get(), 2);
+ ASSERT_EQ(std::string(view), "hi");
+ auto bytes = ColumnarUtils::GetBytes<arrow::BinaryType>(array.get(), 1,
pool.get());
+ ASSERT_EQ(*std::make_shared<Bytes>("def", pool.get()), *bytes);
+}
+
+TEST(ColumnarUtilsTest, TestGetViewAndBytesOfDict) {
+ auto pool = GetDefaultPool();
+ auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
R"(["foo", "bar", "baz"])")
+ .ValueOrDie();
+ auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8());
+ auto indices =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0,
2, 0]").ValueOrDie();
+ std::shared_ptr<arrow::DictionaryArray> dict_array =
+ std::make_shared<arrow::DictionaryArray>(dict_type, indices, dict);
+
+ ASSERT_EQ("bar", std::string(ColumnarUtils::GetView(dict_array.get(), 0)));
+ ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 1)));
+ ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 2)));
+ ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 3)));
+ ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 4)));
+}
+
+} // namespace paimon::test