This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1966faf  feat: migrate common/data/columnar module (#24)
1966faf is described below

commit 1966faf5ddbaaee06a403b842f19870573189d8d
Author: lxy <[email protected]>
AuthorDate: Fri May 29 10:36:13 2026 +0800

    feat: migrate common/data/columnar module (#24)
    
    Squash merge PR #24.
---
 src/paimon/common/data/columnar/columnar_array.cpp | 160 ++++++++
 src/paimon/common/data/columnar/columnar_array.h   | 154 +++++++
 .../common/data/columnar/columnar_array_test.cpp   | 312 ++++++++++++++
 .../common/data/columnar/columnar_batch_context.h  |  40 ++
 src/paimon/common/data/columnar/columnar_map.cpp   |  45 ++
 src/paimon/common/data/columnar/columnar_map.h     |  53 +++
 src/paimon/common/data/columnar/columnar_row.cpp   |  89 ++++
 src/paimon/common/data/columnar/columnar_row.h     | 160 ++++++++
 .../common/data/columnar/columnar_row_ref.cpp      |  87 ++++
 src/paimon/common/data/columnar/columnar_row_ref.h | 136 ++++++
 .../common/data/columnar/columnar_row_test.cpp     | 455 +++++++++++++++++++++
 src/paimon/common/data/columnar/columnar_utils.h   | 116 ++++++
 .../common/data/columnar/columnar_utils_test.cpp   |  56 +++
 13 files changed, 1863 insertions(+)

diff --git a/src/paimon/common/data/columnar/columnar_array.cpp 
b/src/paimon/common/data/columnar/columnar_array.cpp
new file mode 100644
index 0000000..7a96106
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array.cpp
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_array.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Status ColumnarArray::CheckNoNull() const {
+    for (int32_t i = 0; i < length_; i++) {
+        if (IsNullAt(i)) {
+            return Status::Invalid(fmt::format("row {} is null", i));
+        }
+    }
+    return Status::OK();
+}
+
+Decimal ColumnarArray::GetDecimal(int32_t pos, int32_t precision, int32_t 
scale) const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+    auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
+    assert(array);
+    arrow::Decimal128 decimal(array->GetValue(offset_ + pos));
+    return Decimal(
+        precision, scale,
+        static_cast<Decimal::int128_t>(
+            
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) << 
64 |
+            decimal.low_bits()));
+}
+
+Timestamp ColumnarArray::GetTimestamp(int32_t pos, int32_t precision) const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+    auto array = arrow::internal::checked_cast<const ArrayType*>(array_);
+    assert(array);
+    int64_t data = array->Value(offset_ + pos);
+    auto timestamp_type =
+        
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+    // for orc format, data is saved as nano, therefore, Timestamp convert 
should consider precision
+    // in arrow array rather than input precision
+    DateTimeUtils::TimeType time_type = 
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+    auto [milli, nano] = DateTimeUtils::TimestampConverter(
+        data, time_type, DateTimeUtils::TimeType::MILLISECOND, 
DateTimeUtils::TimeType::NANOSECOND);
+    return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalArray> ColumnarArray::GetArray(int32_t pos) const {
+    auto list_array = arrow::internal::checked_cast<const 
arrow::ListArray*>(array_);
+    assert(list_array);
+    int32_t offset = list_array->value_offset(offset_ + pos);
+    int32_t length = list_array->value_length(offset_ + pos);
+    return std::make_shared<ColumnarArray>(list_array->values().get(), pool_, 
offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarArray::GetMap(int32_t pos) const {
+    auto map_array = arrow::internal::checked_cast<const 
arrow::MapArray*>(array_);
+    assert(map_array);
+    int32_t offset = map_array->value_offset(offset_ + pos);
+    int32_t length = map_array->value_length(offset_ + pos);
+    return std::make_shared<ColumnarMap>(map_array->keys(), 
map_array->items(), pool_, offset,
+                                         length);
+}
+
+std::shared_ptr<InternalRow> ColumnarArray::GetRow(int32_t pos, int32_t 
num_fields) const {
+    auto struct_array = arrow::internal::checked_cast<const 
arrow::StructArray*>(array_);
+    assert(struct_array);
+    auto row_ctx = 
std::make_shared<ColumnarBatchContext>(struct_array->fields(), pool_);
+    return std::make_shared<ColumnarRowRef>(std::move(row_ctx), offset_ + pos);
+}
+
+Result<std::vector<char>> ColumnarArray::ToBooleanArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<char> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        bool element = GetBoolean(i);
+        res[i] = element ? static_cast<char>(1) : static_cast<char>(0);
+    }
+    return res;
+}
+
+Result<std::vector<char>> ColumnarArray::ToByteArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<char> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetByte(i);
+    }
+    return res;
+}
+
+Result<std::vector<int16_t>> ColumnarArray::ToShortArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<int16_t> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetShort(i);
+    }
+    return res;
+}
+
+Result<std::vector<int32_t>> ColumnarArray::ToIntArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<int32_t> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetInt(i);
+    }
+    return res;
+}
+
+Result<std::vector<int64_t>> ColumnarArray::ToLongArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<int64_t> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetLong(i);
+    }
+    return res;
+}
+
+Result<std::vector<float>> ColumnarArray::ToFloatArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<float> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetFloat(i);
+    }
+    return res;
+}
+
+Result<std::vector<double>> ColumnarArray::ToDoubleArray() const {
+    PAIMON_RETURN_NOT_OK(CheckNoNull());
+    std::vector<double> res(length_);
+    for (int32_t i = 0; i < length_; i++) {
+        res[i] = GetDouble(i);
+    }
+    return res;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_array.h 
b/src/paimon/common/data/columnar/columnar_array.h
new file mode 100644
index 0000000..773e46f
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array.h
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class BinaryType;
+class BooleanType;
+class Date32Type;
+class DoubleType;
+class FloatType;
+class Int16Type;
+class Int32Type;
+class Int64Type;
+class Int8Type;
+class StringType;
+}  // namespace arrow
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Columnar array to support access to vector column data.
+///
+/// NOTE: This class holds a non-owning raw pointer to the underlying 
arrow::Array for efficiency.
+/// The caller must ensure that the pointed-to Array outlives this 
ColumnarArray instance.
+/// Typically, lifetime is guaranteed by the owning ColumnarBatchContext or 
the parent
+/// arrow container (e.g., ListArray, MapArray) that holds the shared_ptr.
+class ColumnarArray : public InternalArray {
+ public:
+    ColumnarArray(const arrow::Array* array, const 
std::shared_ptr<MemoryPool>& pool,
+                  int32_t offset, int32_t length)
+        : pool_(pool), array_(array), offset_(offset), length_(length) {
+        assert(array_);
+        assert(array_->length() >= offset + length);
+    }
+
+    int32_t Size() const override {
+        return length_;
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        return array_->IsNull(offset_ + pos);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::BooleanType, 
bool>(array_, offset_ + pos);
+    }
+
+    char GetByte(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int8Type, char>(array_, 
offset_ + pos);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int16Type, 
int16_t>(array_, offset_ + pos);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int32Type, 
int32_t>(array_, offset_ + pos);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Date32Type, 
int32_t>(array_, offset_ + pos);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int64Type, 
int64_t>(array_, offset_ + pos);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::FloatType, float>(array_, 
offset_ + pos);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::DoubleType, 
double>(array_, offset_ + pos);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        auto bytes = ColumnarUtils::GetBytes<arrow::StringType>(array_, 
offset_ + pos, pool_.get());
+        return BinaryString::FromBytes(bytes);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        return ColumnarUtils::GetView(array_, offset_ + pos);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override;
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        return ColumnarUtils::GetBytes<arrow::BinaryType>(array_, offset_ + 
pos, pool_.get());
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override;
+
+    Result<std::vector<char>> ToBooleanArray() const override;
+
+    Result<std::vector<char>> ToByteArray() const override;
+
+    Result<std::vector<int16_t>> ToShortArray() const override;
+
+    Result<std::vector<int32_t>> ToIntArray() const override;
+
+    Result<std::vector<int64_t>> ToLongArray() const override;
+
+    Result<std::vector<float>> ToFloatArray() const override;
+
+    Result<std::vector<double>> ToDoubleArray() const override;
+
+ private:
+    Status CheckNoNull() const;
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    const arrow::Array* array_;
+    int32_t offset_;
+    int32_t length_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_array_test.cpp 
b/src/paimon/common/data/columnar/columnar_array_test.cpp
new file mode 100644
index 0000000..650d7eb
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_array_test.cpp
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_array.h"
+
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/util/checked_cast.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ColumnarArrayTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    {
+        auto f1 =
+            arrow::ipc::internal::json::ArrayFromJSON(
+                arrow::list(arrow::boolean()), "[[true, false], [true], 
[false], [false, true]]")
+                .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/2, 1);
+        ASSERT_EQ(array.Size(), 1);
+        ASSERT_EQ(array.GetBoolean(0), true);
+        std::vector<char> expected_array = {static_cast<char>(1)};
+        ASSERT_EQ(array.ToBooleanArray().value(), expected_array);
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int8()),
+                                                            "[[1, 1, 2], [3], 
[2], [2]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/5, 1);
+        ASSERT_EQ(array.GetByte(0), 2);
+        std::vector<char> expected_array = {static_cast<char>(2)};
+        ASSERT_EQ(array.ToByteArray().value(), expected_array);
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int16()),
+                                                            "[[1, 1, 2], [3], 
[2], [-4]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 3);
+        ASSERT_EQ(array.GetShort(0), 1);
+        ASSERT_EQ(array.GetShort(1), 1);
+        ASSERT_EQ(array.GetShort(2), 2);
+        std::vector<int16_t> expected_array = {1, 1, 2};
+        ASSERT_EQ(array.ToShortArray().value(), expected_array);
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int32()),
+                                                            "[[1, 1, 2], [3], 
[2], [-4]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/3, 1);
+        ASSERT_EQ(array.GetInt(0), 3);
+        std::vector<int32_t> expected_array = {3};
+        ASSERT_EQ(array.ToIntArray().value(), expected_array);
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+                                                            "[[1, 1, 2], [3], 
[2], [-4]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/4, 1);
+        ASSERT_EQ(array.GetLong(0), 2);
+        std::vector<int64_t> expected_array = {2};
+        ASSERT_EQ(array.ToLongArray().value(), expected_array);
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+                                                            "[[1, 1, 2], [3], 
[null], null]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/4, 1);
+        ASSERT_NOK_WITH_MSG(array.ToLongArray(), "is null");
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::float32()), "[[0.0, 1.1, 2.2], [3.3], 
[4.4], [5.5]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 3);
+        ASSERT_NEAR(array.GetFloat(1), 1.1, 0.001);
+        std::vector<float> expected_array = {0.0, 1.1, 2.2};
+        ASSERT_EQ(array.ToFloatArray().value(), expected_array);
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::float64()), "[[0.0, 1.1, 2.2], [3.3], 
[4.4], [5.5]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/3, 1);
+        ASSERT_NEAR(array.GetDouble(0), 3.3, 0.001);
+        std::vector<double> expected_array = {3.3};
+        ASSERT_EQ(array.ToDoubleArray().value(), expected_array);
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::utf8()), R"([["abc", "def"], ["efg"], 
["hello"], ["hi"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/4, 1);
+        ASSERT_EQ(array.GetString(0).ToString(), "hi");
+        ASSERT_EQ(std::string(array.GetStringView(0)), "hi");
+    }
+}
+
+TEST(ColumnarArrayTest, TestComplexAndNestedType) {
+    auto pool = GetDefaultPool();
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::date32()),
+                                                            "[[1, 1, 2], [3], 
[2], [-4]]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/3, 1);
+        ASSERT_EQ(array.GetDate(0), 3);
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::decimal128(10, 3)),
+                      R"([["1.234", "1234.000"], ["-9876.543"], ["666.888"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 2);
+        ASSERT_EQ(array.GetDecimal(0, 10, 3), Decimal(10, 3, 1234));
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)),
+                      R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 1);
+        auto ts = array.GetTimestamp(0, 9);
+        ASSERT_EQ(ts, Timestamp(59000, 0));
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::binary()),
+                                                            R"([["aaa", "bb"], 
["ccc"], ["bbb"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 2);
+        ASSERT_EQ(*array.GetBinary(1), Bytes("bb", pool.get()));
+        ASSERT_EQ(std::string(array.GetStringView(1)), "bb");
+    }
+    {
+        auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::struct_({
+                                                                field("sub1", 
arrow::int64()),
+                                                                field("sub2", 
arrow::int64()),
+                                                                field("sub3", 
arrow::int64()),
+                                                                field("sub4", 
arrow::int64()),
+                                                            })),
+                                                            R"([
+      [[1, 3, 2, 5],
+      [2, 2, 1, 3]],
+      [[3, 2, 1, 3]],
+      [[4, 1, 0, 2]]
+    ])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 2);
+        auto result_row = array.GetRow(1, 4);
+        ASSERT_EQ(result_row->GetLong(0), 2);
+        ASSERT_EQ(result_row->GetLong(1), 2);
+        ASSERT_EQ(result_row->GetLong(2), 1);
+        ASSERT_EQ(result_row->GetLong(3), 3);
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::list(arrow::int64())), "[[[1, 2, 3], 
[4, 5, 6]], []]")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 1);
+        auto result_array = array.GetArray(0);
+        auto inner_result_array = array.GetArray(0);
+        std::vector<int64_t> values = {1, 2, 3};
+        ASSERT_EQ(inner_result_array->ToLongArray().value(), values);
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::map(arrow::int32(), arrow::int64())),
+                      R"([
+                       [[[1, 3], [4, 4]]], []
+                      ])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        ASSERT_TRUE(list_array);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/0, 1);
+        auto result_key = array.GetMap(0)->KeyArray();
+        auto result_value = array.GetMap(0)->ValueArray();
+        ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1, 
4}));
+        ASSERT_EQ(result_value->ToLongArray().value(), 
std::vector<int64_t>({3, 4}));
+    }
+}
+TEST(ColumnarArrayTest, TestTimestampType) {
+    auto pool = GetDefaultPool();
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)),
+                      R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 0);
+        ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI)),
+                      R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 3);
+        ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO)),
+                      
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 6);
+        ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::NANO)),
+                      
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 9);
+        ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+                      R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 0);
+        ASSERT_EQ(ts, Timestamp(951866603000, 0)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone)),
+                      R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 3);
+        ASSERT_EQ(ts, Timestamp(951866603001, 0)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone)),
+                      
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 6);
+        ASSERT_EQ(ts, Timestamp(951866603001, 1000)) << ts.GetMillisecond();
+    }
+    {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                      arrow::list(arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)),
+                      
R"([["1970-01-01T00:00:59"],["2000-02-29T23:23:23.001001001",
+          "1899-01-01T00:59:20"],["2033-05-18T03:33:20"]])")
+                      .ValueOrDie();
+        auto list_array = 
arrow::internal::checked_pointer_cast<arrow::ListArray>(f1);
+        auto array = ColumnarArray(list_array->values().get(), pool, 
/*offset=*/1, 2);
+        auto ts = array.GetTimestamp(0, 9);
+        ASSERT_EQ(ts, Timestamp(951866603001, 1001)) << ts.GetMillisecond();
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h 
b/src/paimon/common/data/columnar/columnar_batch_context.h
new file mode 100644
index 0000000..2d35c0d
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_batch_context.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+
+namespace arrow {
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+struct ColumnarBatchContext {
+    ColumnarBatchContext(const arrow::ArrayVector& array_vec_in,
+                         const std::shared_ptr<MemoryPool>& pool_in)
+        : pool(pool_in), array_vec(array_vec_in) {}
+
+    std::shared_ptr<MemoryPool> pool;
+    arrow::ArrayVector array_vec;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_map.cpp 
b/src/paimon/common/data/columnar/columnar_map.cpp
new file mode 100644
index 0000000..13351ad
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_map.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_map.h"
+
+#include "paimon/common/data/columnar/columnar_array.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+ColumnarMap::ColumnarMap(const std::shared_ptr<arrow::Array>& key_array,
+                         const std::shared_ptr<arrow::Array>& value_array,
+                         const std::shared_ptr<MemoryPool>& pool, int32_t 
offset, int32_t length)
+    : pool_(pool),
+      key_array_(key_array),
+      value_array_(value_array),
+      offset_(offset),
+      length_(length) {}
+
+std::shared_ptr<InternalArray> ColumnarMap::KeyArray() const {
+    return std::make_shared<ColumnarArray>(key_array_.get(), pool_, offset_, 
length_);
+}
+std::shared_ptr<InternalArray> ColumnarMap::ValueArray() const {
+    return std::make_shared<ColumnarArray>(value_array_.get(), pool_, offset_, 
length_);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_map.h 
b/src/paimon/common/data/columnar/columnar_map.h
new file mode 100644
index 0000000..549a75a
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_map.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+
+/// Columnar map to support access to vector column data.
+class ColumnarMap : public InternalMap {
+ public:
+    ColumnarMap(const std::shared_ptr<arrow::Array>& key_array,
+                const std::shared_ptr<arrow::Array>& value_array,
+                const std::shared_ptr<MemoryPool>& pool, int32_t offset, 
int32_t length);
+
+    int32_t Size() const override {
+        return length_;
+    }
+    std::shared_ptr<InternalArray> KeyArray() const override;
+    std::shared_ptr<InternalArray> ValueArray() const override;
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<arrow::Array> key_array_;
+    std::shared_ptr<arrow::Array> value_array_;
+    int32_t offset_;
+    int32_t length_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row.cpp 
b/src/paimon/common/data/columnar/columnar_row.cpp
new file mode 100644
index 0000000..bc7b150
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row.h"
+
+#include <cassert>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "paimon/common/data/columnar/columnar_array.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Decimal ColumnarRow::GetDecimal(int32_t pos, int32_t precision, int32_t scale) 
const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+    auto array = arrow::internal::checked_cast<const 
ArrayType*>(array_vec_[pos]);
+    assert(array);
+    arrow::Decimal128 decimal(array->GetValue(row_id_));
+    return Decimal(
+        precision, scale,
+        static_cast<Decimal::int128_t>(
+            
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) << 
64 |
+            decimal.low_bits()));
+}
+
+Timestamp ColumnarRow::GetTimestamp(int32_t pos, int32_t precision) const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+    auto array = arrow::internal::checked_cast<const 
ArrayType*>(array_vec_[pos]);
+    assert(array);
+    int64_t data = array->Value(row_id_);
+    auto timestamp_type =
+        
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+    // for orc format, data is saved as nano, therefore, Timestamp convert 
should consider precision
+    // in arrow array rather than input precision
+    DateTimeUtils::TimeType time_type = 
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+    auto [milli, nano] = DateTimeUtils::TimestampConverter(
+        data, time_type, DateTimeUtils::TimeType::MILLISECOND, 
DateTimeUtils::TimeType::NANOSECOND);
+    return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalRow> ColumnarRow::GetRow(int32_t pos, int32_t 
num_fields) const {
+    auto struct_array = arrow::internal::checked_cast<const 
arrow::StructArray*>(array_vec_[pos]);
+    assert(struct_array);
+    // NOTE: For performance, the returned nested row does NOT hold shared 
ownership of the parent
+    // StructArray. Callers must ensure the parent ColumnarRow (or its 
underlying RecordBatch)
+    // outlives the returned row to avoid dangling pointers.
+    return std::make_shared<ColumnarRow>(struct_array->fields(), pool_, 
row_id_);
+}
+
+std::shared_ptr<InternalArray> ColumnarRow::GetArray(int32_t pos) const {
+    auto list_array = arrow::internal::checked_cast<const 
arrow::ListArray*>(array_vec_[pos]);
+    assert(list_array);
+    int32_t offset = list_array->value_offset(row_id_);
+    int32_t length = list_array->value_length(row_id_);
+    return std::make_shared<ColumnarArray>(list_array->values().get(), pool_, 
offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarRow::GetMap(int32_t pos) const {
+    auto map_array = arrow::internal::checked_cast<const 
arrow::MapArray*>(array_vec_[pos]);
+    assert(map_array);
+    int32_t offset = map_array->value_offset(row_id_);
+    int32_t length = map_array->value_length(row_id_);
+    return std::make_shared<ColumnarMap>(map_array->keys(), 
map_array->items(), pool_, offset,
+                                         length);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row.h 
b/src/paimon/common/data/columnar/columnar_row.h
new file mode 100644
index 0000000..2156c81
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row.h
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+class Bytes;
+class MemoryPool;
+
+/// Columnar row to support access to vector column data. It is a row view in 
arrow::Array.
+class ColumnarRow : public InternalRow {
+ public:
+    /// @brief Construct a ColumnarRow without holding ownership of the 
underlying arrays.
+    /// @warning The caller MUST ensure the data source (e.g., RecordBatch or 
parent StructArray)
+    /// outlives this ColumnarRow. The internal array_vec_ stores raw pointers 
only; if the
+    /// source is freed first, these pointers will dangle. This design is 
intentional for
+    /// performance—avoiding per-row shared_ptr ref-count overhead on the hot 
read path.
+    ColumnarRow(const arrow::ArrayVector& array_vec, const 
std::shared_ptr<MemoryPool>& pool,
+                int64_t row_id)
+        : ColumnarRow(/*struct_array holder*/ nullptr, array_vec, pool, 
row_id) {}
+
+    /// @brief Construct a ColumnarRow that holds shared ownership of a 
StructArray.
+    /// @note When struct_array is non-null it keeps the underlying buffers 
alive, making it safe
+    /// to outlive the original batch. Prefer this overload when the row may 
escape the scope of
+    /// its parent container.
+    ColumnarRow(const std::shared_ptr<arrow::StructArray>& struct_array,
+                const arrow::ArrayVector& array_vec, const 
std::shared_ptr<MemoryPool>& pool,
+                int64_t row_id)
+        : struct_array_(struct_array), pool_(pool), row_id_(row_id) {
+        array_vec_.reserve(array_vec.size());
+        for (const auto& array : array_vec) {
+            array_vec_.push_back(array.get());
+        }
+    }
+
+    Result<const RowKind*> GetRowKind() const override {
+        return row_kind_;
+    }
+
+    void SetRowKind(const RowKind* kind) override {
+        row_kind_ = kind;
+    }
+
+    int32_t GetFieldCount() const override {
+        return array_vec_.size();
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        return array_vec_[pos]->IsNull(row_id_);
+    }
+    bool GetBoolean(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::BooleanType, 
bool>(array_vec_[pos], row_id_);
+    }
+
+    char GetByte(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int8Type, 
char>(array_vec_[pos], row_id_);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int16Type, 
int16_t>(array_vec_[pos], row_id_);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int32Type, 
int32_t>(array_vec_[pos], row_id_);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Date32Type, 
int32_t>(array_vec_[pos], row_id_);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int64Type, 
int64_t>(array_vec_[pos], row_id_);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::FloatType, 
float>(array_vec_[pos], row_id_);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::DoubleType, 
double>(array_vec_[pos], row_id_);
+    }
+
+    /// @note `GetString()` and `GetBinary()` will deep copy string data to 
pool, use
+    /// `GetStringView()` to avoid copy
+    BinaryString GetString(int32_t pos) const override {
+        auto bytes =
+            ColumnarUtils::GetBytes<arrow::StringType>(array_vec_[pos], 
row_id_, pool_.get());
+        return BinaryString::FromBytes(bytes);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        return ColumnarUtils::GetView(array_vec_[pos], row_id_);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override;
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        return ColumnarUtils::GetBytes<arrow::BinaryType>(array_vec_[pos], 
row_id_, pool_.get());
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override;
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+    std::string ToString() const override {
+        return fmt::format("ColumnarRow, row_id {}", row_id_);
+    }
+
+ private:
+    /// @note `struct_array_` is the data holder for columnar row, ensure that 
the data life
+    /// cycle is consistent with the columnar row, `array_vec_` maybe a subset 
of
+    /// `struct_array_`, so `struct_array_` cannot be used for `GetXXX()`
+    std::shared_ptr<arrow::StructArray> struct_array_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::vector<const arrow::Array*> array_vec_;
+    const RowKind* row_kind_ = RowKind::Insert();
+    int64_t row_id_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_ref.cpp 
b/src/paimon/common/data/columnar/columnar_row_ref.cpp
new file mode 100644
index 0000000..9aaba46
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_ref.cpp
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+
+#include <cassert>
+
+#include "arrow/array/array_decimal.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "paimon/common/data/columnar/columnar_array.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/utils/date_time_utils.h"
+
+namespace paimon {
+Decimal ColumnarRowRef::GetDecimal(int32_t pos, int32_t precision, int32_t 
scale) const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::Decimal128Type>::ArrayType;
+    auto array = arrow::internal::checked_cast<const 
ArrayType*>(ctx_->array_vec[pos].get());
+    assert(array);
+    arrow::Decimal128 decimal(array->GetValue(row_id_));
+    return Decimal(
+        precision, scale,
+        static_cast<Decimal::int128_t>(
+            
static_cast<Decimal::uint128_t>(static_cast<uint64_t>(decimal.high_bits())) << 
64 |
+            decimal.low_bits()));
+}
+
+Timestamp ColumnarRowRef::GetTimestamp(int32_t pos, int32_t precision) const {
+    using ArrayType = typename 
arrow::TypeTraits<arrow::TimestampType>::ArrayType;
+    auto array = arrow::internal::checked_cast<const 
ArrayType*>(ctx_->array_vec[pos].get());
+    assert(array);
+    int64_t data = array->Value(row_id_);
+    auto timestamp_type =
+        
arrow::internal::checked_pointer_cast<arrow::TimestampType>(array->type());
+    // for orc format, data is saved as nano, therefore, Timestamp convert 
should consider precision
+    // in arrow array rather than input precision
+    DateTimeUtils::TimeType time_type = 
DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type);
+    auto [milli, nano] = DateTimeUtils::TimestampConverter(
+        data, time_type, DateTimeUtils::TimeType::MILLISECOND, 
DateTimeUtils::TimeType::NANOSECOND);
+    return Timestamp(milli, nano);
+}
+
+std::shared_ptr<InternalRow> ColumnarRowRef::GetRow(int32_t pos, int32_t 
num_fields) const {
+    auto struct_array =
+        arrow::internal::checked_cast<const 
arrow::StructArray*>(ctx_->array_vec[pos].get());
+    assert(struct_array);
+    auto nested_ctx = 
std::make_shared<ColumnarBatchContext>(struct_array->fields(), ctx_->pool);
+    return std::make_shared<ColumnarRowRef>(std::move(nested_ctx), row_id_);
+}
+
+std::shared_ptr<InternalArray> ColumnarRowRef::GetArray(int32_t pos) const {
+    auto list_array =
+        arrow::internal::checked_cast<const 
arrow::ListArray*>(ctx_->array_vec[pos].get());
+    assert(list_array);
+    int32_t offset = list_array->value_offset(row_id_);
+    int32_t length = list_array->value_length(row_id_);
+    return std::make_shared<ColumnarArray>(list_array->values().get(), 
ctx_->pool, offset, length);
+}
+
+std::shared_ptr<InternalMap> ColumnarRowRef::GetMap(int32_t pos) const {
+    auto map_array =
+        arrow::internal::checked_cast<const 
arrow::MapArray*>(ctx_->array_vec[pos].get());
+    assert(map_array);
+    int32_t offset = map_array->value_offset(row_id_);
+    int32_t length = map_array->value_length(row_id_);
+    return std::make_shared<ColumnarMap>(map_array->keys(), 
map_array->items(), ctx_->pool, offset,
+                                         length);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_ref.h 
b/src/paimon/common/data/columnar/columnar_row_ref.h
new file mode 100644
index 0000000..b08fdf6
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_ref.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/columnar/columnar_batch_context.h"
+#include "paimon/common/data/columnar/columnar_utils.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+
+/// Columnar row view which shares batch-level context to reduce per-row 
overhead.
+class ColumnarRowRef : public InternalRow {
+ public:
+    ColumnarRowRef(std::shared_ptr<ColumnarBatchContext> ctx, int64_t row_id)
+        : ctx_(std::move(ctx)), row_id_(row_id) {}
+
+    Result<const RowKind*> GetRowKind() const override {
+        return row_kind_;
+    }
+
+    void SetRowKind(const RowKind* kind) override {
+        row_kind_ = kind;
+    }
+
+    int32_t GetFieldCount() const override {
+        return static_cast<int32_t>(ctx_->array_vec.size());
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        return ctx_->array_vec[pos]->IsNull(row_id_);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::BooleanType, 
bool>(ctx_->array_vec[pos].get(),
+                                                                        
row_id_);
+    }
+
+    char GetByte(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int8Type, 
char>(ctx_->array_vec[pos].get(),
+                                                                     row_id_);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int16Type, 
int16_t>(ctx_->array_vec[pos].get(),
+                                                                         
row_id_);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int32Type, 
int32_t>(ctx_->array_vec[pos].get(),
+                                                                         
row_id_);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Date32Type, int32_t>(
+            ctx_->array_vec[pos].get(), row_id_);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::Int64Type, 
int64_t>(ctx_->array_vec[pos].get(),
+                                                                         
row_id_);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::FloatType, 
float>(ctx_->array_vec[pos].get(),
+                                                                       
row_id_);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        return ColumnarUtils::GetGenericValue<arrow::DoubleType, 
double>(ctx_->array_vec[pos].get(),
+                                                                         
row_id_);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        auto bytes = 
ColumnarUtils::GetBytes<arrow::StringType>(ctx_->array_vec[pos].get(), row_id_,
+                                                                
ctx_->pool.get());
+        return BinaryString::FromBytes(bytes);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        return ColumnarUtils::GetView(ctx_->array_vec[pos].get(), row_id_);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override;
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override;
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        return 
ColumnarUtils::GetBytes<arrow::BinaryType>(ctx_->array_vec[pos].get(), row_id_,
+                                                          ctx_->pool.get());
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override;
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override;
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override;
+
+    std::string ToString() const override {
+        return fmt::format("ColumnarRowRef, row_id {}", row_id_);
+    }
+
+ private:
+    std::shared_ptr<ColumnarBatchContext> ctx_;
+    const RowKind* row_kind_ = RowKind::Insert();
+    int64_t row_id_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_row_test.cpp 
b/src/paimon/common/data/columnar/columnar_row_test.cpp
new file mode 100644
index 0000000..6301cd2
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_row_test.cpp
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_row.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/columnar/columnar_row_ref.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(ColumnarRowTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type =
+        arrow::struct_({arrow::field("f1", arrow::boolean()), 
arrow::field("f2", arrow::int8()),
+                        arrow::field("f3", arrow::int16()), arrow::field("f4", 
arrow::int32()),
+                        arrow::field("f5", arrow::int64()), arrow::field("f6", 
arrow::float32()),
+                        arrow::field("f7", arrow::float64()), 
arrow::field("f8", arrow::utf8())});
+    auto f1 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([true, 
false, false, true])")
+            .ValueOrDie();
+    auto f2 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([0, 1, 2, 
3])").ValueOrDie();
+    auto f3 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int16(), R"([4, 5, 6, 
7])").ValueOrDie();
+    auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), 
R"([10, 11, 12, 13])")
+                  .ValueOrDie();
+    auto f5 = arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), 
R"([15, 16, 17, 18])")
+                  .ValueOrDie();
+    auto f6 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float32(), 
R"([0.0, 1.1, 2.2, 3.3])")
+                  .ValueOrDie();
+    auto f7 = arrow::ipc::internal::json::ArrayFromJSON(arrow::float64(), 
R"([5.5, 6.6, 7.7, 8.8])")
+                  .ValueOrDie();
+    auto f8 = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(),
+                                                        R"(["Hello", "World", 
"HELLO", "WORLD"])")
+                  .ValueOrDie();
+    auto data = arrow::StructArray::Make({f1, f2, f3, f4, f5, f6, f7, f8}, 
target_type->fields())
+                    .ValueOrDie();
+
+    auto row = ColumnarRow(data->fields(), pool, 0);
+    ASSERT_EQ(row.GetFieldCount(), 8);
+    ASSERT_EQ(row.GetBoolean(0), true);
+    ASSERT_EQ(row.GetByte(1), 0);
+    ASSERT_EQ(row.GetShort(2), 4);
+    ASSERT_EQ(row.GetInt(3), 10);
+    ASSERT_EQ(row.GetLong(4), 15);
+    ASSERT_EQ(row.GetFloat(5), 0.0);
+    ASSERT_EQ(row.GetDouble(6), 5.5);
+    ASSERT_EQ(row.GetString(7).ToString(), "Hello");
+    ASSERT_EQ(std::string(row.GetStringView(7)), "Hello");
+}
+
+TEST(ColumnarRowRefTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type =
+        arrow::struct_({arrow::field("f1", arrow::int32()), arrow::field("f2", 
arrow::utf8())});
+    auto f1 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 
3])").ValueOrDie();
+    auto f2 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["alpha", 
"beta", "gamma"])")
+            .ValueOrDie();
+    auto data = arrow::StructArray::Make({f1, f2}, 
target_type->fields()).ValueOrDie();
+
+    auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+    ColumnarRowRef row(ctx, 1);
+    ASSERT_EQ(row.GetFieldCount(), 2);
+    ASSERT_EQ(row.GetInt(0), 2);
+    ASSERT_EQ(std::string(row.GetStringView(1)), "beta");
+
+    auto row_kind = row.GetRowKind();
+    ASSERT_TRUE(row_kind.ok());
+    ASSERT_EQ(row_kind.value(), RowKind::Insert());
+    row.SetRowKind(RowKind::Delete());
+    auto updated_kind = row.GetRowKind();
+    ASSERT_TRUE(updated_kind.ok());
+    ASSERT_EQ(updated_kind.value(), RowKind::Delete());
+}
+
+TEST(ColumnarRowTest, TestComplexAndNestedType) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type = arrow::struct_({
+        arrow::field("f0", arrow::date32()),
+        arrow::field("f1", arrow::decimal128(10, 3)),
+        arrow::field("f2", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("f3", arrow::binary()),
+        arrow::field(
+            "f4", arrow::struct_({field("sub1", arrow::int64()), field("sub2", 
arrow::int64()),
+                                  field("sub3", arrow::int64()), field("sub4", 
arrow::int64())})),
+        arrow::field("f5", arrow::list(arrow::int64())),
+        arrow::field("f6", arrow::map(arrow::int32(), arrow::int64())),
+        arrow::field("f7", arrow::map(arrow::int32(), 
arrow::list(arrow::int64()))),
+        arrow::field("f8", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("f9", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("f10", arrow::timestamp(arrow::TimeUnit::MICRO)),
+    });
+    auto f0 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::date32(), R"([109, 
1000, -1000, 555])")
+            .ValueOrDie();
+    auto f1 = arrow::ipc::internal::json::ArrayFromJSON(
+                  arrow::decimal128(10, 3), R"(["1.234", "1234.000", 
"-9876.543", "666.888"])")
+                  .ValueOrDie();
+    auto f2 =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::NANO),
+                                                  
R"(["1970-01-01T00:00:59","2000-02-29T23:23:23",
+          "1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+            .ValueOrDie();
+    auto f3 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), R"(["aaa", 
"bb", "ccc", "bbb"])")
+            .ValueOrDie();
+    auto f4 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({
+                                                            field("sub1", 
arrow::int64()),
+                                                            field("sub2", 
arrow::int64()),
+                                                            field("sub3", 
arrow::int64()),
+                                                            field("sub4", 
arrow::int64()),
+                                                        }),
+                                                        R"([
+      [1, 3, 2, 5],
+      [2, 2, 1, 3],
+      [3, 2, 1, 3],
+      [4, 1, 0, 2]
+    ])")
+                  .ValueOrDie();
+    auto f5 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::list(arrow::int64()),
+                                                        "[[1, 1, 2], [3], [2], 
[-4]]")
+                  .ValueOrDie();
+    auto f6 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::map(arrow::int32(), 
arrow::int64()),
+                                                        R"([[[1, 3], [4, 4]],
+                                                            [[1, 5], [7, 6], 
[100, 7]],
+                                                            [[0, 9]],
+                                                            null])")
+                  .ValueOrDie();
+    auto f7 = arrow::ipc::internal::json::ArrayFromJSON(
+                  arrow::map(arrow::int32(), arrow::list(arrow::int64())),
+                  R"([[[1, [10, 20]], [4, [40, 50, 100]]],
+                      [[1, [1, 2]], [7, [6]], [100, [8]]],
+                      [[0, [9]]],
+                      null])")
+                  .ValueOrDie();
+    auto f8 =
+        
arrow::ipc::internal::json::ArrayFromJSON(arrow::timestamp(arrow::TimeUnit::SECOND),
+                                                  
R"(["1970-01-01T00:00:59","2000-02-29T23:23:23",
+                                                      
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+            .ValueOrDie();
+    auto f9 = arrow::ipc::internal::json::ArrayFromJSON(
+                  arrow::timestamp(arrow::TimeUnit::MILLI),
+                  R"(["1970-01-01T00:00:59.001","2000-02-29T23:23:23",
+                                                      
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+                  .ValueOrDie();
+    auto f10 = arrow::ipc::internal::json::ArrayFromJSON(
+                   arrow::timestamp(arrow::TimeUnit::MICRO),
+                   R"(["1970-01-01T00:00:59.000001","2000-02-29T23:23:23",
+                                                      
"1899-01-01T00:59:20","2033-05-18T03:33:20"])")
+                   .ValueOrDie();
+
+    auto data = arrow::StructArray::Make({f0, f1, f2, f3, f4, f5, f6, f7, f8, 
f9, f10},
+                                         target_type->fields())
+                    .ValueOrDie();
+
+    auto row = ColumnarRow(data->fields(), pool, 0);
+    ASSERT_EQ(row.GetFieldCount(), 11);
+    ASSERT_EQ(row.GetDate(0), 109);
+    ASSERT_EQ(row.GetDecimal(1, 10, 3), Decimal(10, 3, 1234));
+
+    auto ts = row.GetTimestamp(2, /*precision=*/9);
+    ASSERT_EQ(ts, Timestamp(59000, 0));
+
+    ASSERT_EQ(*row.GetBinary(3), Bytes("aaa", pool.get()));
+    ASSERT_EQ(std::string(row.GetStringView(3)), "aaa");
+
+    auto result_row = row.GetRow(4, 4);
+    ASSERT_EQ(result_row->GetLong(0), 1);
+    ASSERT_EQ(result_row->GetLong(1), 3);
+    ASSERT_EQ(result_row->GetLong(2), 2);
+    ASSERT_EQ(result_row->GetLong(3), 5);
+
+    std::vector<int64_t> values = {1, 1, 2};
+    auto result_array = row.GetArray(5);
+    ASSERT_EQ(result_array->ToLongArray().value(), values);
+
+    auto result_key = row.GetMap(6)->KeyArray();
+    auto result_value = row.GetMap(6)->ValueArray();
+    ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1, 4}));
+    ASSERT_EQ(result_value->ToLongArray().value(), std::vector<int64_t>({3, 
4}));
+
+    result_key = row.GetMap(7)->KeyArray();
+    result_value = row.GetMap(7)->ValueArray();
+
+    ASSERT_EQ(result_key->ToIntArray().value(), std::vector<int32_t>({1, 4}));
+    ASSERT_EQ(2, result_value->Size());
+    ASSERT_EQ(result_value->GetArray(0)->ToLongArray().value(), 
std::vector<int64_t>({10, 20}));
+    ASSERT_EQ(result_value->GetArray(1)->ToLongArray().value(),
+              std::vector<int64_t>({40, 50, 100}));
+
+    auto ts_second = row.GetTimestamp(8, /*precision=*/0);
+    ASSERT_EQ(ts_second, Timestamp(59000, 0));
+
+    auto ts_milli = row.GetTimestamp(9, /*precision=*/3);
+    ASSERT_EQ(ts_milli, Timestamp(59001, 0));
+
+    auto ts_micro = row.GetTimestamp(10, /*precision=*/6);
+    ASSERT_EQ(ts_micro, Timestamp(59000, 1000));
+}
+
+TEST(ColumnarRowTest, TestTimestampType) {
+    auto pool = GetDefaultPool();
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone)),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone)),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone)),
+    };
+    auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02", 
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01 
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01 
00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004", 
"1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01 
00:00:06", null, "1970-01-01 00:00:00.000006", null]
+    ])")
+            .ValueOrDie());
+    {
+        auto row = ColumnarRow(array->fields(), pool, 0);
+        ASSERT_EQ(row.GetFieldCount(), 8);
+        ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(1000, 0));
+        ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(1, 0));
+        ASSERT_EQ(row.GetTimestamp(2, /*precision=*/6), Timestamp(0, 1000));
+        ASSERT_EQ(row.GetTimestamp(3, /*precision=*/9), Timestamp(0, 1));
+        ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(2000, 0));
+        ASSERT_EQ(row.GetTimestamp(5, /*precision=*/3), Timestamp(2, 0));
+        ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 2000));
+        ASSERT_EQ(row.GetTimestamp(7, /*precision=*/9), Timestamp(0, 2));
+    }
+    {
+        auto row = ColumnarRow(array->fields(), pool, 2);
+        ASSERT_EQ(row.GetFieldCount(), 8);
+        ASSERT_EQ(row.GetTimestamp(0, /*precision=*/0), Timestamp(5000, 0));
+        ASSERT_EQ(row.GetTimestamp(1, /*precision=*/3), Timestamp(5, 0));
+        ASSERT_TRUE(row.IsNullAt(2));
+        ASSERT_TRUE(row.IsNullAt(3));
+        ASSERT_EQ(row.GetTimestamp(4, /*precision=*/0), Timestamp(6000, 0));
+        ASSERT_TRUE(row.IsNullAt(5));
+        ASSERT_EQ(row.GetTimestamp(6, /*precision=*/6), Timestamp(0, 6000));
+    }
+}
+
+TEST(ColumnarRowTest, TestNull) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type =
+        arrow::struct_({arrow::field("f1", arrow::boolean()), 
arrow::field("f2", arrow::int8())});
+    auto f1 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::boolean(), R"([null, 
false, false, true])")
+            .ValueOrDie();
+    auto f2 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int8(), R"([null, 1, 
2, 3])").ValueOrDie();
+    auto data = arrow::StructArray::Make({f1, f2}, 
target_type->fields()).ValueOrDie();
+
+    auto row = ColumnarRow(data->fields(), pool, 0);
+    row.SetRowKind(RowKind::Insert());
+    ASSERT_EQ(row.GetFieldCount(), 2);
+    ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert());
+    ASSERT_TRUE(row.IsNullAt(0));
+    ASSERT_TRUE(row.IsNullAt(1));
+}
+
+TEST(ColumnarRowTest, TestDictionary) {
+    auto pool = GetDefaultPool();
+    auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), 
R"(["foo", "bar", "baz"])")
+                    .ValueOrDie();
+    auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8());
+    auto indices =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0, 
2, 0]").ValueOrDie();
+    std::shared_ptr<arrow::DictionaryArray> dict_array =
+        std::make_shared<arrow::DictionaryArray>(dict_type, indices, dict);
+
+    auto f0 = arrow::field("f0", arrow::list(dict_type));
+    auto f1 = arrow::field("f1", arrow::struct_({arrow::field("sub1", 
arrow::int64()),
+                                                 arrow::field("sub2", 
arrow::binary()),
+                                                 arrow::field("sub3", 
dict_type)}));
+
+    auto list_offsets =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([0, 
5])").ValueOrDie();
+    auto f0_array = arrow::ListArray::FromArrays(*list_offsets, 
*dict_array).ValueOrDie();
+
+    auto sub1_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), 
R"([1])").ValueOrDie();
+    auto sub2_array =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), 
R"(["apple"])").ValueOrDie();
+    auto sub3_array =
+        std::make_shared<arrow::DictionaryArray>(dict_type, indices->Slice(0, 
1), dict);
+    auto f1_array = std::make_shared<arrow::StructArray>(
+        f1->type(), /*length=*/1, arrow::ArrayVector({sub1_array, sub2_array, 
sub3_array}));
+
+    auto struct_type = arrow::struct_({f0, f1});
+    // data: [["bar", "baz", "foo", "baz", "foo"], [1, "apple", "bar"]]
+    auto data = std::make_shared<arrow::StructArray>(struct_type, /*length=*/1,
+                                                     
arrow::ArrayVector({f0_array, f1_array}));
+
+    auto row = ColumnarRow(data->fields(), pool, 0);
+    ASSERT_FALSE(row.IsNullAt(0));
+    auto internal_array = row.GetArray(0);
+    ASSERT_TRUE(internal_array);
+    ASSERT_EQ(5, internal_array->Size());
+    ASSERT_EQ("bar", internal_array->GetString(0).ToString());
+    ASSERT_EQ("baz", internal_array->GetString(1).ToString());
+    ASSERT_EQ("foo", internal_array->GetString(2).ToString());
+    ASSERT_EQ("baz", internal_array->GetString(3).ToString());
+    ASSERT_EQ("foo", internal_array->GetString(4).ToString());
+
+    ASSERT_EQ("bar", std::string(internal_array->GetStringView(0)));
+    ASSERT_EQ("baz", std::string(internal_array->GetStringView(1)));
+    ASSERT_EQ("foo", std::string(internal_array->GetStringView(2)));
+    ASSERT_EQ("baz", std::string(internal_array->GetStringView(3)));
+    ASSERT_EQ("foo", std::string(internal_array->GetStringView(4)));
+
+    ASSERT_FALSE(row.IsNullAt(1));
+    auto internal_row = row.GetRow(1, 3);
+    ASSERT_TRUE(internal_row);
+    ASSERT_EQ(1, internal_row->GetLong(0));
+    auto bytes = internal_row->GetBinary(1);
+    ASSERT_EQ("apple", std::string(bytes->data(), bytes->size()));
+    ASSERT_EQ("apple", std::string(internal_row->GetStringView(1)));
+    ASSERT_EQ("bar", internal_row->GetString(2).ToString());
+    ASSERT_EQ("bar", std::string(internal_row->GetStringView(2)));
+}
+
+TEST(ColumnarRowTest, TestDataLifeCycle) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type = 
arrow::struct_({arrow::field(
+        "f0", arrow::struct_({field("sub1", arrow::int64()), field("sub2", 
arrow::int64()),
+                              field("sub3", arrow::int64()), field("sub4", 
arrow::int64())}))});
+    auto f0 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({
+                                                            field("sub1", 
arrow::int64()),
+                                                            field("sub2", 
arrow::int64()),
+                                                            field("sub3", 
arrow::int64()),
+                                                            field("sub4", 
arrow::int64()),
+                                                        }),
+                                                        R"([
+      [1, 3, 2, 5],
+      [2, 2, 1, 3],
+      [3, 2, 1, 3],
+      [4, 1, 0, 2]
+    ])")
+                  .ValueOrDie();
+    auto data = arrow::StructArray::Make({f0}, 
target_type->fields()).ValueOrDie();
+    auto row = std::make_unique<ColumnarRow>(data, data->fields(), pool, 0);
+    data.reset();
+    f0.reset();
+    // array data is only held by columnar row
+    ASSERT_EQ(1, row->struct_array_.use_count());
+
+    ASSERT_EQ(row->GetFieldCount(), 1);
+    ASSERT_FALSE(row->IsNullAt(0));
+    auto result_row = row->GetRow(0, 4);
+
+    ASSERT_FALSE(result_row->IsNullAt(0));
+    ASSERT_EQ(result_row->GetLong(0), 1);
+    ASSERT_FALSE(result_row->IsNullAt(1));
+    ASSERT_EQ(result_row->GetLong(1), 3);
+    ASSERT_FALSE(result_row->IsNullAt(2));
+    ASSERT_EQ(result_row->GetLong(2), 2);
+    ASSERT_FALSE(result_row->IsNullAt(3));
+    ASSERT_EQ(result_row->GetLong(3), 5);
+}
+
+TEST(ColumnarRowTest, TestColumnarRowRefGetBinary) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type = arrow::struct_({
+        arrow::field("f0", arrow::binary()),
+        arrow::field("f1", arrow::binary()),
+    });
+    auto f0 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), 
R"(["hello", "world", null])")
+            .ValueOrDie();
+    auto f1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::binary(), 
R"(["abc", "", "xyz"])")
+                  .ValueOrDie();
+    auto data = arrow::StructArray::Make({f0, f1}, 
target_type->fields()).ValueOrDie();
+
+    auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+
+    {
+        ColumnarRowRef row(ctx, 0);
+        auto binary = row.GetBinary(0);
+        ASSERT_TRUE(binary);
+        ASSERT_EQ(std::string(binary->data(), binary->size()), "hello");
+
+        auto binary1 = row.GetBinary(1);
+        ASSERT_TRUE(binary1);
+        ASSERT_EQ(std::string(binary1->data(), binary1->size()), "abc");
+    }
+    {
+        ColumnarRowRef row(ctx, 1);
+        auto binary = row.GetBinary(0);
+        ASSERT_TRUE(binary);
+        ASSERT_EQ(std::string(binary->data(), binary->size()), "world");
+
+        auto binary1 = row.GetBinary(1);
+        ASSERT_TRUE(binary1);
+        ASSERT_EQ(binary1->size(), 0);
+    }
+    {
+        ColumnarRowRef row(ctx, 2);
+        ASSERT_TRUE(row.IsNullAt(0));
+
+        auto binary1 = row.GetBinary(1);
+        ASSERT_TRUE(binary1);
+        ASSERT_EQ(std::string(binary1->data(), binary1->size()), "xyz");
+    }
+}
+
+TEST(ColumnarRowTest, TestColumnarRowRefToString) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<arrow::DataType> target_type =
+        arrow::struct_({arrow::field("f0", arrow::int32())});
+    auto f0 =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), R"([1, 2, 
3])").ValueOrDie();
+    auto data = arrow::StructArray::Make({f0}, 
target_type->fields()).ValueOrDie();
+
+    auto ctx = std::make_shared<ColumnarBatchContext>(data->fields(), pool);
+
+    {
+        ColumnarRowRef row(ctx, 0);
+        ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 0");
+    }
+    {
+        ColumnarRowRef row(ctx, 2);
+        ASSERT_EQ(row.ToString(), "ColumnarRowRef, row_id 2");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/columnar/columnar_utils.h 
b/src/paimon/common/data/columnar/columnar_utils.h
new file mode 100644
index 0000000..ca9f58e
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_utils.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string_view>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/checked_cast.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+class ColumnarUtils {
+ public:
+    ColumnarUtils() = delete;
+    ~ColumnarUtils() = delete;
+
+    template <typename DataType, typename ValueType>
+    static ValueType GetGenericValue(const arrow::Array* array, int32_t pos) {
+        using ArrayType = typename arrow::TypeTraits<DataType>::ArrayType;
+        const auto* typed_array = arrow::internal::checked_cast<const 
ArrayType*>(array);
+        assert(typed_array);
+        return typed_array->Value(pos);
+    }
+
+    static std::string_view GetView(const arrow::Array* array, int32_t pos) {
+        auto type_id = array->type_id();
+        bool is_dict = (type_id == arrow::Type::type::DICTIONARY);
+        if (!is_dict) {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::BinaryArray*>(array);
+            assert(typed_array);
+            return typed_array->GetView(pos);
+        } else {
+            const auto* typed_array =
+                arrow::internal::checked_cast<const 
arrow::DictionaryArray*>(array);
+            assert(typed_array);
+            auto dict_type =
+                
arrow::internal::checked_pointer_cast<arrow::DictionaryType>(array->type());
+            assert(dict_type);
+            auto value_type_id = dict_type->value_type()->id();
+            auto index_type_id = dict_type->index_type()->id();
+            int64_t dict_index = -1;
+            if (index_type_id == arrow::Type::type::INT8) {
+                auto indices =
+                    
arrow::internal::checked_cast<arrow::Int8Array*>(typed_array->indices().get());
+                assert(indices);
+                dict_index = indices->Value(pos);
+            } else if (index_type_id == arrow::Type::type::INT16) {
+                auto indices =
+                    
arrow::internal::checked_cast<arrow::Int16Array*>(typed_array->indices().get());
+                assert(indices);
+                dict_index = indices->Value(pos);
+            } else if (index_type_id == arrow::Type::type::INT32) {
+                auto indices =
+                    
arrow::internal::checked_cast<arrow::Int32Array*>(typed_array->indices().get());
+                assert(indices);
+                dict_index = indices->Value(pos);
+            } else if (index_type_id == arrow::Type::type::INT64) {
+                auto indices =
+                    
arrow::internal::checked_cast<arrow::Int64Array*>(typed_array->indices().get());
+                assert(indices);
+                dict_index = indices->Value(pos);
+            }
+            assert(dict_index >= 0);
+            if (value_type_id == arrow::Type::type::STRING) {
+                auto dictionary = 
arrow::internal::checked_cast<arrow::StringArray*>(
+                    typed_array->dictionary().get());
+                assert(dictionary);
+                return dictionary->GetView(dict_index);
+            } else if (value_type_id == arrow::Type::type::LARGE_STRING) {
+                auto dictionary = 
arrow::internal::checked_cast<arrow::LargeStringArray*>(
+                    typed_array->dictionary().get());
+                assert(dictionary);
+                return dictionary->GetView(dict_index);
+            }
+            assert(false);
+            return std::string_view();
+        }
+    }
+
+    template <typename DataType>
+    static std::shared_ptr<Bytes> GetBytes(const arrow::Array* array, int32_t 
pos,
+                                           MemoryPool* pool) {
+        auto view = GetView(array, pos);
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(view.size(), pool);
+        memcpy(bytes->data(), view.data(), view.size());
+        return bytes;
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/data/columnar/columnar_utils_test.cpp 
b/src/paimon/common/data/columnar/columnar_utils_test.cpp
new file mode 100644
index 0000000..7eb8187
--- /dev/null
+++ b/src/paimon/common/data/columnar/columnar_utils_test.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "paimon/common/data/columnar/columnar_utils.h"
+
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/array/array_dict.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(ColumnarUtilsTest, TestGetViewAndBytes) {
+    auto pool = GetDefaultPool();
+    auto array = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), 
R"(["abc", "def", "hi"])")
+                     .ValueOrDie();
+    std::string_view view = ColumnarUtils::GetView(array.get(), 2);
+    ASSERT_EQ(std::string(view), "hi");
+    auto bytes = ColumnarUtils::GetBytes<arrow::BinaryType>(array.get(), 1, 
pool.get());
+    ASSERT_EQ(*std::make_shared<Bytes>("def", pool.get()), *bytes);
+}
+
+TEST(ColumnarUtilsTest, TestGetViewAndBytesOfDict) {
+    auto pool = GetDefaultPool();
+    auto dict = arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), 
R"(["foo", "bar", "baz"])")
+                    .ValueOrDie();
+    auto dict_type = arrow::dictionary(arrow::int32(), arrow::utf8());
+    auto indices =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 2, 0, 
2, 0]").ValueOrDie();
+    std::shared_ptr<arrow::DictionaryArray> dict_array =
+        std::make_shared<arrow::DictionaryArray>(dict_type, indices, dict);
+
+    ASSERT_EQ("bar", std::string(ColumnarUtils::GetView(dict_array.get(), 0)));
+    ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 1)));
+    ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 2)));
+    ASSERT_EQ("baz", std::string(ColumnarUtils::GetView(dict_array.get(), 3)));
+    ASSERT_EQ("foo", std::string(ColumnarUtils::GetView(dict_array.get(), 4)));
+}
+
+}  // namespace paimon::test

Reply via email to