This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d22988  feat: add row-based immutable data structure (#181)
4d22988 is described below

commit 4d229888e3bbdb071b69fb907ef04a4740afa11c
Author: Gang Wu <[email protected]>
AuthorDate: Wed Oct 8 00:49:05 2025 +0800

    feat: add row-based immutable data structure (#181)
    
    - Add StructLike, MapLike, and ArrayLike interfaces
    - Add wrapper for ManifestFile and ArrowArray
---
 src/iceberg/CMakeLists.txt              |   3 +
 src/iceberg/manifest_reader_internal.cc |  55 +++--
 src/iceberg/manifest_reader_internal.h  |  24 ++
 src/iceberg/row/CMakeLists.txt          |  18 ++
 src/iceberg/row/arrow_array_wrapper.cc  | 409 ++++++++++++++++++++++++++++++++
 src/iceberg/row/arrow_array_wrapper.h   | 114 +++++++++
 src/iceberg/row/manifest_wrapper.cc     | 137 +++++++++++
 src/iceberg/row/manifest_wrapper.h      | 100 ++++++++
 src/iceberg/row/struct_like.h           |  99 ++++++++
 src/iceberg/test/CMakeLists.txt         |   5 +-
 src/iceberg/test/struct_like_test.cc    | 389 ++++++++++++++++++++++++++++++
 src/iceberg/type_fwd.h                  |   7 +
 12 files changed, 1335 insertions(+), 25 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 1fd8e91..37259e2 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -31,6 +31,8 @@ set(ICEBERG_SOURCES
     name_mapping.cc
     partition_field.cc
     partition_spec.cc
+    row/arrow_array_wrapper.cc
+    row/manifest_wrapper.cc
     schema.cc
     schema_field.cc
     schema_internal.cc
@@ -106,6 +108,7 @@ iceberg_install_all_headers(iceberg)
 
 add_subdirectory(catalog)
 add_subdirectory(expression)
+add_subdirectory(row)
 add_subdirectory(util)
 
 install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
diff --git a/src/iceberg/manifest_reader_internal.cc 
b/src/iceberg/manifest_reader_internal.cc
index 126bd75..feff82f 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -27,6 +27,7 @@
 #include "iceberg/manifest_list.h"
 #include "iceberg/schema.h"
 #include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -37,7 +38,7 @@ namespace iceberg {
   }
 
 #define PARSE_PRIMITIVE_FIELD(item, array_view, type)                          
         \
-  for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {          
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
     if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
       auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);            
         \
       item = static_cast<type>(value);                                         
         \
@@ -48,7 +49,7 @@ namespace iceberg {
   }
 
 #define PARSE_STRING_FIELD(item, array_view)                                   
         \
-  for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {          
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
     if (!ArrowArrayViewIsNull(array_view, row_idx)) {                          
         \
       auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);         
         \
       item = std::string(value.data, value.size_bytes);                        
         \
@@ -59,7 +60,7 @@ namespace iceberg {
   }
 
 #define PARSE_BINARY_FIELD(item, array_view)                                   
         \
-  for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {          
         \
+  for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {         
         \
     if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {                      
         \
       item = ArrowArrayViewGetInt8Vector(array_view, row_idx);                 
         \
     } else if (required) {                                                     
         \
@@ -225,66 +226,67 @@ Result<std::vector<ManifestFile>> 
ParseManifestList(ArrowSchema* schema,
     auto field_name = field.value()->get().name();
     bool required = !field.value()->get().optional();
     auto view_of_column = array_view.children[idx];
-    switch (idx) {
-      case 0:
+    ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field, 
ManifestFileFieldFromIndex(idx));
+    switch (manifest_file_field) {
+      case ManifestFileField::kManifestPath:
         PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path, 
view_of_column);
         break;
-      case 1:
+      case ManifestFileField::kManifestLength:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length, 
view_of_column,
                               int64_t);
         break;
-      case 2:
+      case ManifestFileField::kPartitionSpecId:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id, 
view_of_column,
                               int32_t);
         break;
-      case 3:
+      case ManifestFileField::kContent:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
                               ManifestFile::Content);
         break;
-      case 4:
+      case ManifestFileField::kSequenceNumber:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number, 
view_of_column,
                               int64_t);
         break;
-      case 5:
+      case ManifestFileField::kMinSequenceNumber:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number, 
view_of_column,
                               int64_t);
         break;
-      case 6:
+      case ManifestFileField::kAddedSnapshotId:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id, 
view_of_column,
                               int64_t);
         break;
-      case 7:
+      case ManifestFileField::kAddedFilesCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count, 
view_of_column,
                               int32_t);
         break;
-      case 8:
+      case ManifestFileField::kExistingFilesCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
                               view_of_column, int32_t);
         break;
-      case 9:
+      case ManifestFileField::kDeletedFilesCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count, 
view_of_column,
                               int32_t);
         break;
-      case 10:
+      case ManifestFileField::kAddedRowsCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count, 
view_of_column,
                               int64_t);
         break;
-      case 11:
+      case ManifestFileField::kExistingRowsCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count, 
view_of_column,
                               int64_t);
         break;
-      case 12:
+      case ManifestFileField::kDeletedRowsCount:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count, 
view_of_column,
                               int64_t);
         break;
-      case 13:
+      case ManifestFileField::kPartitionFieldSummary:
         ICEBERG_RETURN_UNEXPECTED(
             ParsePartitionFieldSummaryList(view_of_column, manifest_files));
         break;
-      case 14:
+      case ManifestFileField::kKeyMetadata:
         PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata, 
view_of_column);
         break;
-      case 15:
+      case ManifestFileField::kFirstRowId:
         PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id, 
view_of_column,
                               int64_t);
         break;
@@ -295,7 +297,7 @@ Result<std::vector<ManifestFile>> 
ParseManifestList(ArrowSchema* schema,
   return manifest_files;
 }
 
-Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
+Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
                     std::vector<ManifestEntry>& manifest_entries) {
   if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
     auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
@@ -355,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr<StructType>& 
data_file_schema,
                            view_of_file_field);
         break;
       case 2:
-        for (size_t row_idx = 0; row_idx < view_of_file_field->length; 
row_idx++) {
+        for (int64_t row_idx = 0; row_idx < view_of_file_field->length; 
row_idx++) {
           if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
             auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field, 
row_idx);
             std::string_view path_str(value.data, value.size_bytes);
@@ -510,7 +512,7 @@ Result<std::vector<ManifestEntry>> 
ParseManifestEntry(ArrowSchema* schema,
         break;
       case 4: {
         auto data_file_schema =
-            dynamic_pointer_cast<StructType>(field.value()->get().type());
+            
internal::checked_pointer_cast<StructType>(field.value()->get().type());
         ICEBERG_RETURN_UNEXPECTED(
             ParseDataFile(data_file_schema, view_of_column, manifest_entries));
         break;
@@ -571,4 +573,11 @@ Result<std::vector<ManifestFile>> 
ManifestListReaderImpl::Files() const {
   return manifest_files;
 }
 
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
+  if (index >= 0 && index < 
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
+    return static_cast<ManifestFileField>(index);
+  }
+  return InvalidArgument("Invalid manifest file field index: {}", index);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/manifest_reader_internal.h 
b/src/iceberg/manifest_reader_internal.h
index 3144d07..13e3d2a 100644
--- a/src/iceberg/manifest_reader_internal.h
+++ b/src/iceberg/manifest_reader_internal.h
@@ -60,4 +60,28 @@ class ManifestListReaderImpl : public ManifestListReader {
   std::unique_ptr<Reader> reader_;
 };
 
+enum class ManifestFileField : int32_t {
+  kManifestPath = 0,
+  kManifestLength = 1,
+  kPartitionSpecId = 2,
+  kContent = 3,
+  kSequenceNumber = 4,
+  kMinSequenceNumber = 5,
+  kAddedSnapshotId = 6,
+  kAddedFilesCount = 7,
+  kExistingFilesCount = 8,
+  kDeletedFilesCount = 9,
+  kAddedRowsCount = 10,
+  kExistingRowsCount = 11,
+  kDeletedRowsCount = 12,
+  kPartitionFieldSummary = 13,
+  kKeyMetadata = 14,
+  kFirstRowId = 15,
+  // kNextUnusedId is the placeholder for the next unused index.
+  // Always keep this as the last index when adding new fields.
+  kNextUnusedId = 16,
+};
+
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index);
+
 }  // namespace iceberg
diff --git a/src/iceberg/row/CMakeLists.txt b/src/iceberg/row/CMakeLists.txt
new file mode 100644
index 0000000..8deb6d2
--- /dev/null
+++ b/src/iceberg/row/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+iceberg_install_all_headers(iceberg/row)
diff --git a/src/iceberg/row/arrow_array_wrapper.cc 
b/src/iceberg/row/arrow_array_wrapper.cc
new file mode 100644
index 0000000..e97293b
--- /dev/null
+++ b/src/iceberg/row/arrow_array_wrapper.cc
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/row/arrow_array_wrapper.h"
+
+#include <cstring>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+#define NANOARROW_RETURN_IF_NOT_OK(status)                         \
+  if (status != NANOARROW_OK) [[unlikely]] {                       \
+    return InvalidArrowData("Nanoarrow error: {}", error.message); \
+  }
+
+namespace {
+
+// TODO(gangwu): Reuse created ArrowArrayStructLike and others with cache.
+Result<Scalar> ExtractValue(const ArrowSchema* schema, const ArrowArray* array,
+                            const ArrowArrayView* array_view, int64_t index) {
+  if (ArrowArrayViewIsNull(array_view, index)) {
+    return std::monostate{};
+  }
+
+  switch (array_view->storage_type) {
+    case NANOARROW_TYPE_BOOL:
+      return static_cast<bool>(ArrowArrayViewGetIntUnsafe(array_view, index));
+    case NANOARROW_TYPE_INT32:
+    case NANOARROW_TYPE_DATE32:
+      return static_cast<int32_t>(ArrowArrayViewGetIntUnsafe(array_view, 
index));
+    case NANOARROW_TYPE_INT64:
+    case NANOARROW_TYPE_TIME64:
+    case NANOARROW_TYPE_TIMESTAMP:
+      return ArrowArrayViewGetIntUnsafe(array_view, index);
+    case NANOARROW_TYPE_FLOAT:
+      return static_cast<float>(ArrowArrayViewGetDoubleUnsafe(array_view, 
index));
+    case NANOARROW_TYPE_DOUBLE:
+      return ArrowArrayViewGetDoubleUnsafe(array_view, index);
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_BINARY:
+    case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+    case NANOARROW_TYPE_STRING_VIEW:
+    case NANOARROW_TYPE_BINARY_VIEW:
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_LARGE_BINARY: {
+      ArrowStringView value = ArrowArrayViewGetStringUnsafe(array_view, index);
+      return std::string_view(value.data, value.size_bytes);
+    }
+    case NANOARROW_TYPE_DECIMAL128: {
+      ArrowError error;
+      ArrowSchemaView schema_view;
+      NANOARROW_RETURN_IF_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, 
&error));
+      ArrowDecimal value;
+      ArrowDecimalInit(&value, schema_view.decimal_bitwidth,
+                       schema_view.decimal_precision, 
schema_view.decimal_scale);
+      ArrowArrayViewGetDecimalUnsafe(array_view, index, &value);
+      if (value.n_words != 2) {
+        return InvalidArrowData("Unsupported Arrow decimal words: {}", 
value.n_words);
+      }
+      int128_t int_value{0};
+      std::memcpy(&int_value, value.words, sizeof(int128_t));
+      return Decimal(int_value);
+    }
+    case NANOARROW_TYPE_STRUCT: {
+      ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructLike> struct_like,
+                              ArrowArrayStructLike::Make(*schema, *array, 
index));
+      return struct_like;
+    }
+    case NANOARROW_TYPE_LIST: {
+      ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ArrayLike> array_like,
+                              ArrowArrayArrayLike::Make(*schema, *array, 
index));
+      return array_like;
+    }
+    case NANOARROW_TYPE_MAP: {
+      ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<MapLike> map_like,
+                              ArrowArrayMapLike::Make(*schema, *array, index));
+      return map_like;
+    }
+    case NANOARROW_TYPE_EXTENSION:
+      // TODO(gangwu): Handle these types properly
+    default:
+      return NotImplemented("Unsupported Arrow type: {}",
+                            static_cast<int>(array_view->storage_type));
+  }
+}
+
+}  // namespace
+
+// ArrowArrayStructLike Implementation
+
+class ArrowArrayStructLike::Impl {
+ public:
+  Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+      : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+  ~Impl() = default;
+
+  Result<Scalar> GetField(size_t pos) const {
+    // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+    if (pos >= static_cast<size_t>(schema_.n_children)) {
+      return InvalidArgument("Field index {} out of range (size: {})", pos,
+                             schema_.n_children);
+    }
+
+    if (row_index_ < 0 || row_index_ >= array_.get().length) {
+      return InvalidArgument("Row index {} out of range (length: {})", 
row_index_,
+                             array_.get().length);
+    }
+
+    const ArrowSchema* child_schema = schema_.children[pos];
+    const ArrowArray* child_array = array_.get().children[pos];
+    const ArrowArrayView* child_view = array_view_.children[pos];
+
+    return ExtractValue(child_schema, child_array, child_view, row_index_);
+  }
+
+  size_t num_fields() const { return static_cast<size_t>(schema_.n_children); }
+
+  Status Reset(const ArrowArray& array, int64_t row_index) {
+    array_ = std::cref(array);
+    row_index_ = row_index;
+
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return {};
+  }
+
+  Status Reset(int64_t row_index) {
+    row_index_ = row_index;
+    return {};
+  }
+
+  Status Init() {
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return {};
+  }
+
+ private:
+  ArrowArrayView array_view_;
+  internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+  const ArrowSchema& schema_;
+  std::reference_wrapper<const ArrowArray> array_;
+  int64_t row_index_;
+};
+
+Result<std::unique_ptr<ArrowArrayStructLike>> ArrowArrayStructLike::Make(
+    const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+  auto impl = std::make_unique<Impl>(schema, array, row_index);
+  ICEBERG_RETURN_UNEXPECTED(impl->Init());
+  return std::unique_ptr<ArrowArrayStructLike>(new 
ArrowArrayStructLike(std::move(impl)));
+}
+
+ArrowArrayStructLike::ArrowArrayStructLike(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
+ArrowArrayStructLike::~ArrowArrayStructLike() = default;
+
+Result<Scalar> ArrowArrayStructLike::GetField(size_t pos) const {
+  return impl_->GetField(pos);
+}
+
+size_t ArrowArrayStructLike::num_fields() const { return impl_->num_fields(); }
+
+Status ArrowArrayStructLike::Reset(int64_t row_index) { return 
impl_->Reset(row_index); }
+
+Status ArrowArrayStructLike::Reset(const ArrowArray& array, int64_t row_index) 
{
+  return impl_->Reset(array, row_index);
+}
+
+// ArrowArrayArrayLike Implementation
+
+class ArrowArrayArrayLike::Impl {
+ public:
+  Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+      : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+  ~Impl() = default;
+
+  Result<Scalar> GetElement(size_t pos) const {
+    // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+    if (pos >= static_cast<size_t>(length_)) {
+      return InvalidArgument("Element index {} out of range (length: {})", 
pos, length_);
+    }
+
+    const ArrowSchema* child_schema = schema_.children[0];
+    const ArrowArray* child_array = array_.get().children[0];
+    const ArrowArrayView* child_view = array_view_.children[0];
+
+    return ExtractValue(child_schema, child_array, child_view,
+                        offset_ + static_cast<int64_t>(pos));
+  }
+
+  size_t size() const { return static_cast<size_t>(length_); }
+
+  Status Reset(const ArrowArray& array, int64_t row_index) {
+    array_ = std::cref(array);
+    row_index_ = row_index;
+
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return UpdateOffsets();
+  }
+
+  Status Reset(int64_t row_index) {
+    row_index_ = row_index;
+    return UpdateOffsets();
+  }
+
+  Status Init() {
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return UpdateOffsets();
+  }
+
+ private:
+  Status UpdateOffsets() {
+    if (row_index_ < 0 || row_index_ >= array_.get().length) {
+      return InvalidArgument("Row index {} out of range (length: {})", 
row_index_,
+                             array_.get().length);
+    }
+
+    offset_ = ArrowArrayViewListChildOffset(&array_view_, row_index_);
+    length_ = ArrowArrayViewListChildOffset(&array_view_, row_index_ + 1) - 
offset_;
+    return {};
+  }
+
+  ArrowArrayView array_view_;
+  internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+  const ArrowSchema& schema_;
+  std::reference_wrapper<const ArrowArray> array_;
+  int64_t row_index_;
+
+  int64_t offset_ = 0;
+  int64_t length_ = 0;
+};
+
+Result<std::unique_ptr<ArrowArrayArrayLike>> ArrowArrayArrayLike::Make(
+    const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+  auto impl = std::make_unique<Impl>(schema, array, row_index);
+  ICEBERG_RETURN_UNEXPECTED(impl->Init());
+  return std::unique_ptr<ArrowArrayArrayLike>(new 
ArrowArrayArrayLike(std::move(impl)));
+}
+
+ArrowArrayArrayLike::ArrowArrayArrayLike(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
+ArrowArrayArrayLike::~ArrowArrayArrayLike() = default;
+
+Result<Scalar> ArrowArrayArrayLike::GetElement(size_t pos) const {
+  return impl_->GetElement(pos);
+}
+
+size_t ArrowArrayArrayLike::size() const { return impl_->size(); }
+
+Status ArrowArrayArrayLike::Reset(int64_t row_index) { return 
impl_->Reset(row_index); }
+
+Status ArrowArrayArrayLike::Reset(const ArrowArray& array, int64_t row_index) {
+  return impl_->Reset(array, row_index);
+}
+
+// ArrowArrayMapLike Implementation
+
+class ArrowArrayMapLike::Impl {
+ public:
+  Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+      : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+  ~Impl() = default;
+
+  Result<Scalar> GetKey(size_t pos) const {
+    // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+    if (pos >= static_cast<size_t>(length_)) {
+      return InvalidArgument("Key index {} out of range (length: {})", pos, 
length_);
+    }
+
+    const ArrowSchema* keys_schema = schema_.children[0]->children[0];
+    const ArrowArray* keys_array = array_.get().children[0]->children[0];
+    const ArrowArrayView* keys_view = array_view_.children[0]->children[0];
+
+    return ExtractValue(keys_schema, keys_array, keys_view,
+                        offset_ + static_cast<int64_t>(pos));
+  }
+
+  Result<Scalar> GetValue(size_t pos) const {
+    // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+    if (pos >= static_cast<size_t>(length_)) {
+      return InvalidArgument("Value index {} out of range (length: {})", pos, 
length_);
+    }
+
+    const ArrowSchema* values_schema = schema_.children[0]->children[1];
+    const ArrowArray* values_array = array_.get().children[0]->children[1];
+    const ArrowArrayView* values_view = array_view_.children[0]->children[1];
+
+    return ExtractValue(values_schema, values_array, values_view,
+                        offset_ + static_cast<int64_t>(pos));
+  }
+
+  size_t size() const { return static_cast<size_t>(length_); }
+
+  Status Reset(const ArrowArray& array, int64_t row_index) {
+    array_ = std::cref(array);
+    row_index_ = row_index;
+
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return UpdateOffsets();
+  }
+
+  Status Reset(int64_t row_index) {
+    row_index_ = row_index;
+    return UpdateOffsets();
+  }
+
+  Status Init() {
+    ArrowError error;
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+    NANOARROW_RETURN_IF_NOT_OK(
+        ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+    return UpdateOffsets();
+  }
+
+ private:
+  Status UpdateOffsets() {
+    if (row_index_ < 0 || row_index_ >= array_.get().length) {
+      return InvalidArgument("Row index {} out of range (length: {})", 
row_index_,
+                             array_.get().length);
+    }
+
+    // XXX: ArrowArrayViewListChildOffset does not work for map types.
+    // We need to directly access the offsets buffer instead.
+    auto* offsets_buffer = array_view_.buffer_views[1].data.as_int32;
+    offset_ = offsets_buffer[row_index_];
+    length_ = offsets_buffer[row_index_ + 1] - offset_;
+
+    return {};
+  }
+
+  ArrowArrayView array_view_;
+  internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+  const ArrowSchema& schema_;
+  std::reference_wrapper<const ArrowArray> array_;
+  int64_t row_index_;
+
+  int64_t offset_ = 0;
+  int64_t length_ = 0;
+};
+
+Result<std::unique_ptr<ArrowArrayMapLike>> ArrowArrayMapLike::Make(
+    const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+  auto impl = std::make_unique<Impl>(schema, array, row_index);
+  ICEBERG_RETURN_UNEXPECTED(impl->Init());
+  return std::unique_ptr<ArrowArrayMapLike>(new 
ArrowArrayMapLike(std::move(impl)));
+}
+
+ArrowArrayMapLike::ArrowArrayMapLike(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
+ArrowArrayMapLike::~ArrowArrayMapLike() = default;
+
+Result<Scalar> ArrowArrayMapLike::GetKey(size_t pos) const { return 
impl_->GetKey(pos); }
+
+Result<Scalar> ArrowArrayMapLike::GetValue(size_t pos) const {
+  return impl_->GetValue(pos);
+}
+
+size_t ArrowArrayMapLike::size() const { return impl_->size(); }
+
+Status ArrowArrayMapLike::Reset(int64_t row_index) { return 
impl_->Reset(row_index); }
+
+Status ArrowArrayMapLike::Reset(const ArrowArray& array, int64_t row_index) {
+  return impl_->Reset(array, row_index);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/row/arrow_array_wrapper.h 
b/src/iceberg/row/arrow_array_wrapper.h
new file mode 100644
index 0000000..7316bad
--- /dev/null
+++ b/src/iceberg/row/arrow_array_wrapper.h
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/row/arrow_array_wrapper.h
+/// Wrapper classes for ArrowArray that implement StructLike, ArrayLike, and 
MapLike
+/// interfaces for unified row-oriented data access from columnar ArrowArray 
data.
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/row/struct_like.h"
+
+namespace iceberg {
+
+/// \brief Wrapper for one row of a struct-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayStructLike : public StructLike {
+ public:
+  ~ArrowArrayStructLike() override;
+
+  Result<Scalar> GetField(size_t pos) const override;
+
+  size_t num_fields() const override;
+
+  Status Reset(int64_t row_index);
+
+  Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+  static Result<std::unique_ptr<ArrowArrayStructLike>> Make(const ArrowSchema& 
schema,
+                                                            const ArrowArray& 
array,
+                                                            int64_t row_index 
= 0);
+
+  ArrowArrayStructLike(const ArrowArrayStructLike&) = delete;
+  ArrowArrayStructLike& operator=(const ArrowArrayStructLike&) = delete;
+
+ private:
+  class Impl;
+  explicit ArrowArrayStructLike(std::unique_ptr<Impl> impl);
+
+  std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Wrapper for one row of a list-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayArrayLike : public ArrayLike {
+ public:
+  ~ArrowArrayArrayLike() override;
+
+  Result<Scalar> GetElement(size_t pos) const override;
+
+  size_t size() const override;
+
+  Status Reset(int64_t row_index);
+
+  Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+  static Result<std::unique_ptr<ArrowArrayArrayLike>> Make(const ArrowSchema& 
schema,
+                                                           const ArrowArray& 
array,
+                                                           int64_t row_index = 
0);
+
+  ArrowArrayArrayLike(const ArrowArrayArrayLike& other) = delete;
+  ArrowArrayArrayLike& operator=(const ArrowArrayArrayLike& other) = delete;
+
+ private:
+  class Impl;
+  explicit ArrowArrayArrayLike(std::unique_ptr<Impl> impl);
+
+  std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Wrapper for one row of a map-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayMapLike : public MapLike {
+ public:
+  ~ArrowArrayMapLike() override;
+
+  Result<Scalar> GetKey(size_t pos) const override;
+
+  Result<Scalar> GetValue(size_t pos) const override;
+
+  size_t size() const override;
+
+  Status Reset(int64_t row_index);
+
+  Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+  static Result<std::unique_ptr<ArrowArrayMapLike>> Make(const ArrowSchema& 
schema,
+                                                         const ArrowArray& 
array,
+                                                         int64_t row_index = 
0);
+
+  ArrowArrayMapLike(const ArrowArrayMapLike& other) = delete;
+  ArrowArrayMapLike& operator=(const ArrowArrayMapLike& other) = delete;
+
+ private:
+  class Impl;
+  explicit ArrowArrayMapLike(std::unique_ptr<Impl> impl);
+
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.cc 
b/src/iceberg/row/manifest_wrapper.cc
new file mode 100644
index 0000000..4c8708e
--- /dev/null
+++ b/src/iceberg/row/manifest_wrapper.cc
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/row/manifest_wrapper.h"
+
+#include "iceberg/manifest_reader_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+template <typename T>
+  requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, 
std::string>
+std::string_view ToView(const T& value) {
+  return {reinterpret_cast<const char*>(value.data()), value.size()};  // 
NOLINT
+}
+
+template <typename T>
+Result<Scalar> FromOptional(const std::optional<T>& value) {
+  if (value.has_value()) {
+    return value.value();
+  }
+  return std::monostate{};
+}
+
+}  // namespace
+
+Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
+  if (pos >= num_fields()) {
+    return InvalidArgument("Invalid partition field summary index: {}", pos);
+  }
+  switch (pos) {
+    case 0:
+      return summary_.get().contains_null;
+    case 1:
+      return FromOptional(summary_.get().contains_nan);
+    case 2:
+      return FromOptional(
+          summary_.get().lower_bound.transform(ToView<std::vector<uint8_t>>));
+    case 3:
+      return FromOptional(
+          summary_.get().upper_bound.transform(ToView<std::vector<uint8_t>>));
+    default:
+      return InvalidArgument("Invalid partition field summary index: {}", pos);
+  }
+}
+
+Result<Scalar> PartitionFieldSummaryArrayLike::GetElement(size_t pos) const {
+  if (pos >= size()) {
+    return InvalidArgument("Invalid partition field summary index: {}", pos);
+  }
+  if (summary_ == nullptr) {
+    summary_ = 
std::make_shared<PartitionFieldSummaryStructLike>(summaries_.get()[pos]);
+  } else {
+    summary_->Reset(summaries_.get()[pos]);
+  }
+  return summary_;
+}
+
+Result<Scalar> ManifestFileStructLike::GetField(size_t pos) const {
+  if (pos >= num_fields()) {
+    return InvalidArgument("Invalid manifest file field index: {}", pos);
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto field,
+                          
ManifestFileFieldFromIndex(static_cast<int32_t>(pos)));
+  const auto& manifest_file = manifest_file_.get();
+  switch (field) {
+    case ManifestFileField::kManifestPath:
+      return ToView(manifest_file.manifest_path);
+    case ManifestFileField::kManifestLength:
+      return manifest_file.manifest_length;
+    case ManifestFileField::kPartitionSpecId:
+      return manifest_file.partition_spec_id;
+    case ManifestFileField::kContent:
+      return static_cast<int32_t>(manifest_file.content);
+    case ManifestFileField::kSequenceNumber:
+      return manifest_file.sequence_number;
+    case ManifestFileField::kMinSequenceNumber:
+      return manifest_file.min_sequence_number;
+    case ManifestFileField::kAddedSnapshotId:
+      return manifest_file.added_snapshot_id;
+    case ManifestFileField::kAddedFilesCount:
+      return FromOptional(manifest_file.added_files_count);
+    case ManifestFileField::kExistingFilesCount:
+      return FromOptional(manifest_file.existing_files_count);
+    case ManifestFileField::kDeletedFilesCount:
+      return FromOptional(manifest_file.deleted_files_count);
+    case ManifestFileField::kAddedRowsCount:
+      return FromOptional(manifest_file.added_rows_count);
+    case ManifestFileField::kExistingRowsCount:
+      return FromOptional(manifest_file.existing_rows_count);
+    case ManifestFileField::kDeletedRowsCount:
+      return FromOptional(manifest_file.deleted_rows_count);
+    case ManifestFileField::kPartitionFieldSummary: {
+      if (summaries_ == nullptr) {
+        summaries_ =
+            
std::make_shared<PartitionFieldSummaryArrayLike>(manifest_file.partitions);
+      } else {
+        summaries_->Reset(manifest_file.partitions);
+      }
+      return summaries_;
+    }
+    case ManifestFileField::kKeyMetadata:
+      return ToView(manifest_file.key_metadata);
+    case ManifestFileField::kFirstRowId:
+      return FromOptional(manifest_file.first_row_id);
+    case ManifestFileField::kNextUnusedId:
+      return InvalidArgument("Invalid manifest file field index: {}", pos);
+  }
+  return InvalidArgument("Invalid manifest file field index: {}", pos);
+}
+
+size_t ManifestFileStructLike::num_fields() const {
+  return static_cast<size_t>(ManifestFileField::kNextUnusedId);
+}
+
+std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
+  return std::make_unique<ManifestFileStructLike>(file);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.h 
b/src/iceberg/row/manifest_wrapper.h
new file mode 100644
index 0000000..3271eac
--- /dev/null
+++ b/src/iceberg/row/manifest_wrapper.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest_wrapper.h
+/// Wrapper classes for manifest-related data structures that implement
+/// StructLike, ArrayLike, and MapLike interfaces for unified data access.
+
+#include <functional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/row/struct_like.h"
+
+namespace iceberg {
+
+/// \brief StructLike wrapper for PartitionFieldSummary.
+class ICEBERG_EXPORT PartitionFieldSummaryStructLike : public StructLike {
+ public:
+  explicit PartitionFieldSummaryStructLike(const PartitionFieldSummary& 
summary)
+      : summary_(summary) {}
+  ~PartitionFieldSummaryStructLike() override = default;
+
+  PartitionFieldSummaryStructLike(const PartitionFieldSummaryStructLike&) = 
delete;
+  PartitionFieldSummaryStructLike& operator=(const 
PartitionFieldSummaryStructLike&) =
+      delete;
+
+  Result<Scalar> GetField(size_t pos) const override;
+
+  size_t num_fields() const override { return 4; }
+
+  void Reset(const PartitionFieldSummary& summary) { summary_ = 
std::cref(summary); }
+
+ private:
+  std::reference_wrapper<const PartitionFieldSummary> summary_;
+};
+
+/// \brief ArrayLike wrapper for a vector of PartitionFieldSummary.
+class ICEBERG_EXPORT PartitionFieldSummaryArrayLike : public ArrayLike {
+ public:
+  explicit PartitionFieldSummaryArrayLike(
+      const std::vector<PartitionFieldSummary>& summaries)
+      : summaries_(summaries) {}
+  ~PartitionFieldSummaryArrayLike() override = default;
+
+  PartitionFieldSummaryArrayLike(const PartitionFieldSummaryArrayLike&) = 
delete;
+  PartitionFieldSummaryArrayLike& operator=(const 
PartitionFieldSummaryArrayLike&) =
+      delete;
+
+  Result<Scalar> GetElement(size_t pos) const override;
+
+  size_t size() const override { return summaries_.get().size(); }
+
+  void Reset(const std::vector<PartitionFieldSummary>& summaries) {
+    summaries_ = std::cref(summaries);
+  }
+
+ private:
+  std::reference_wrapper<const std::vector<PartitionFieldSummary>> summaries_;
+  mutable std::shared_ptr<PartitionFieldSummaryStructLike> summary_;
+};
+
+/// \brief StructLike wrapper for ManifestFile.
+class ICEBERG_EXPORT ManifestFileStructLike : public StructLike {
+ public:
+  explicit ManifestFileStructLike(const ManifestFile& file) : 
manifest_file_(file) {}
+  ~ManifestFileStructLike() override = default;
+
+  ManifestFileStructLike(const ManifestFileStructLike&) = delete;
+  ManifestFileStructLike& operator=(const ManifestFileStructLike&) = delete;
+
+  Result<Scalar> GetField(size_t pos) const override;
+
+  size_t num_fields() const override;
+
+  void Reset(const ManifestFile& file) { manifest_file_ = std::cref(file); }
+
+ private:
+  std::reference_wrapper<const ManifestFile> manifest_file_;
+  mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/row/struct_like.h b/src/iceberg/row/struct_like.h
new file mode 100644
index 0000000..3093f75
--- /dev/null
+++ b/src/iceberg/row/struct_like.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/row/struct_like.h
+/// Structures for viewing data in a row-based format.  This header contains 
the
+/// definition of StructLike, ArrayLike, and MapLike which provide an unified
+/// interface for viewing data from ArrowArray or structs like ManifestFile and
+/// ManifestEntry.  Note that they do not carry type information and should be
+/// used in conjunction with the schema to get the type information.
+
+#include <memory>
+#include <string_view>
+#include <variant>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/decimal.h"
+
+namespace iceberg {
+
+/// \brief A scalar value depending on its data type.
+///
+/// Note that all string and binary values are stored as non-owned string_view
+/// and their lifetime is managed by the wrapped object.
+using Scalar = std::variant<std::monostate,  // for null
+                            bool,            // for boolean
+                            int32_t,         // for int, date
+                            int64_t,  // for long, timestamp, timestamp_tz, 
and time
+                            float,    // for float
+                            double,   // for double
+                            std::string_view,  // for non-owned string, binary 
and fixed
+                            Decimal,           // for decimal
+                            std::shared_ptr<StructLike>,  // for struct
+                            std::shared_ptr<ArrayLike>,   // for list
+                            std::shared_ptr<MapLike>>;    // for map
+
+/// \brief An immutable struct-like wrapper.
+class ICEBERG_EXPORT StructLike {
+ public:
+  virtual ~StructLike() = default;
+
+  /// \brief Get the field value at the given position.
+  /// \param pos The position of the field in the struct.
+  virtual Result<Scalar> GetField(size_t pos) const = 0;
+
+  /// \brief Get the number of fields in the struct.
+  virtual size_t num_fields() const = 0;
+};
+
+/// \brief An immutable array-like wrapper.
+class ICEBERG_EXPORT ArrayLike {
+ public:
+  virtual ~ArrayLike() = default;
+
+  /// \brief Get the array element at the given position.
+  /// \param pos The position of the element in the array.
+  virtual Result<Scalar> GetElement(size_t pos) const = 0;
+
+  /// \brief Get the number of elements in the array.
+  virtual size_t size() const = 0;
+};
+
+/// \brief An immutable map-like wrapper.
+class ICEBERG_EXPORT MapLike {
+ public:
+  virtual ~MapLike() = default;
+
+  /// \brief Get the key at the given position.
+  /// \param pos The position of the key in the map.
+  virtual Result<Scalar> GetKey(size_t pos) const = 0;
+
+  /// \brief Get the value at the given position.
+  /// \param pos The position of the value in the map.
+  virtual Result<Scalar> GetValue(size_t pos) const = 0;
+
+  /// \brief Get the number of entries in the map.
+  virtual size_t size() const = 0;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index cb3b608..db9caab 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -110,10 +110,11 @@ if(ICEBERG_BUILD_BUNDLE)
   add_iceberg_test(arrow_test
                    USE_BUNDLE
                    SOURCES
-                   arrow_test.cc
                    arrow_fs_file_io_test.cc
+                   arrow_test.cc
+                   gzip_decompress_test.cc
                    metadata_io_test.cc
-                   gzip_decompress_test.cc)
+                   struct_like_test.cc)
 
   add_iceberg_test(catalog_test
                    USE_BUNDLE
diff --git a/src/iceberg/test/struct_like_test.cc 
b/src/iceberg/test/struct_like_test.cc
new file mode 100644
index 0000000..484b254
--- /dev/null
+++ b/src/iceberg/test/struct_like_test.cc
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader_internal.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/row/manifest_wrapper.h"
+#include "iceberg/schema_internal.h"
+#include "matchers.h"
+
+namespace iceberg {
+
+#define EXPECT_SCALAR_EQ(result, expected_type, expected_value) \
+  do {                                                          \
+    ASSERT_THAT(result, IsOk());                                \
+    auto scalar = result.value();                               \
+    ASSERT_TRUE(std::holds_alternative<expected_type>(scalar)); \
+    EXPECT_EQ(std::get<expected_type>(scalar), expected_value); \
+  } while (0)
+
+#define EXPECT_DECIMAL_EQ(result, scale, expected_value)  \
+  do {                                                    \
+    ASSERT_THAT(result, IsOk());                          \
+    auto scalar = result.value();                         \
+    ASSERT_TRUE(std::holds_alternative<Decimal>(scalar)); \
+    auto decimal = std::get<Decimal>(scalar);             \
+    EXPECT_EQ(decimal.ToString(scale), expected_value);   \
+  } while (0)
+
+#define EXPECT_SCALAR_NULL(result)                               \
+  do {                                                           \
+    ASSERT_THAT(result, IsOk());                                 \
+    auto scalar = result.value();                                \
+    ASSERT_TRUE(std::holds_alternative<std::monostate>(scalar)); \
+  } while (0)
+
+TEST(ManifestFileStructLike, BasicFields) {
+  ManifestFile manifest_file{
+      .manifest_path = "/path/to/manifest.avro",
+      .manifest_length = 12345,
+      .partition_spec_id = 1,
+      .content = ManifestFile::Content::kData,
+      .sequence_number = 100,
+      .min_sequence_number = 90,
+      .added_snapshot_id = 1001,
+      .added_files_count = 10,
+      .existing_files_count = 5,
+      .deleted_files_count = 2,
+      .added_rows_count = 1000,
+      .existing_rows_count = 500,
+      .deleted_rows_count = 20,
+  };
+
+  ManifestFileStructLike struct_like(manifest_file);
+  EXPECT_EQ(struct_like.num_fields(), 16);
+
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kManifestPath)),
+      std::string_view, "/path/to/manifest.avro");
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kManifestLength)),
+      int64_t, 12345);
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kPartitionSpecId)),
+      int32_t, 1);
+  
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
+                   int32_t, 
static_cast<int32_t>(ManifestFile::Content::kData));
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kSequenceNumber)),
+      int64_t, 100);
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kAddedFilesCount)),
+      int32_t, 10);
+  EXPECT_THAT(struct_like.GetField(100), IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST(ManifestFileStructLike, OptionalFields) {
+  ManifestFile manifest_file{.manifest_path = "/path/to/manifest2.avro",
+                             .manifest_length = 54321,
+                             .partition_spec_id = 2,
+                             .content = ManifestFile::Content::kDeletes,
+                             .sequence_number = 200,
+                             .min_sequence_number = 180,
+                             .added_snapshot_id = 2001,
+                             .added_files_count = std::nullopt,  // null 
optional field
+                             .existing_files_count = 15,
+                             .deleted_files_count = std::nullopt,  // null 
optional field
+                             .added_rows_count = std::nullopt,     // null 
optional field
+                             .existing_rows_count = 1500,
+                             .deleted_rows_count = 200,
+                             .partitions = {},
+                             .key_metadata = {},
+                             .first_row_id = 12345};
+  ManifestFileStructLike struct_like(manifest_file);
+
+  EXPECT_SCALAR_NULL(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kAddedFilesCount)));
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kExistingFilesCount)),
+      int32_t, 15);
+  EXPECT_SCALAR_EQ(
+      
struct_like.GetField(static_cast<size_t>(ManifestFileField::kFirstRowId)), 
int64_t,
+      12345);
+  
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
+                   int32_t, 
static_cast<int32_t>(ManifestFile::Content::kDeletes));
+}
+
+TEST(ManifestFileStructLike, WithPartitions) {
+  ManifestFile manifest_file{
+      .manifest_path = "/path/to/manifest3.avro",
+      .manifest_length = 98765,
+      .partition_spec_id = 3,
+      .content = ManifestFile::Content::kData,
+      .sequence_number = 300,
+      .min_sequence_number = 290,
+      .added_snapshot_id = 3001,
+      .added_files_count = 20,
+      .existing_files_count = 10,
+      .deleted_files_count = 1,
+      .added_rows_count = 2000,
+      .existing_rows_count = 1000,
+      .deleted_rows_count = 10,
+      .partitions = {{.contains_null = true,
+                      .contains_nan = false,
+                      .lower_bound = std::vector<uint8_t>{0x01, 0x02, 0x03},
+                      .upper_bound = std::vector<uint8_t>{0x04, 0x05, 0x06}},
+                     {.contains_null = false,
+                      .contains_nan = std::nullopt,
+                      .lower_bound = std::vector<uint8_t>{0x10, 0x20},
+                      .upper_bound = std::nullopt}}};
+
+  ManifestFileStructLike struct_like(manifest_file);
+
+  auto partitions_result = struct_like.GetField(
+      static_cast<size_t>(ManifestFileField::kPartitionFieldSummary));
+  ASSERT_THAT(partitions_result, IsOk());
+  auto partitions_scalar = partitions_result.value();
+  
ASSERT_TRUE(std::holds_alternative<std::shared_ptr<ArrayLike>>(partitions_scalar));
+  auto partitions_array = 
std::get<std::shared_ptr<ArrayLike>>(partitions_scalar);
+  EXPECT_EQ(partitions_array->size(), 2);
+
+  // Test 1st partition summary
+  auto first_partition_result = partitions_array->GetElement(0);
+  ASSERT_THAT(first_partition_result, IsOk());
+  auto first_partition_scalar = first_partition_result.value();
+  ASSERT_TRUE(
+      
std::holds_alternative<std::shared_ptr<StructLike>>(first_partition_scalar));
+  auto first_partition_struct =
+      std::get<std::shared_ptr<StructLike>>(first_partition_scalar);
+  EXPECT_EQ(first_partition_struct->num_fields(), 4);
+  EXPECT_SCALAR_EQ(first_partition_struct->GetField(0), bool, true);
+  EXPECT_SCALAR_EQ(first_partition_struct->GetField(1), bool, false);
+  auto lower_bound_result = first_partition_struct->GetField(2);
+  ASSERT_THAT(lower_bound_result, IsOk());
+  auto lower_bound_scalar = lower_bound_result.value();
+  ASSERT_TRUE(std::holds_alternative<std::string_view>(lower_bound_scalar));
+  auto lower_bound_view = std::get<std::string_view>(lower_bound_scalar);
+  EXPECT_EQ(lower_bound_view.size(), 3);
+  EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[0]), 0x01);
+  EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[1]), 0x02);
+  EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[2]), 0x03);
+
+  // Test 2nd partition summary with null fields
+  auto second_partition_result = partitions_array->GetElement(1);
+  ASSERT_THAT(second_partition_result, IsOk());
+  auto second_partition_scalar = second_partition_result.value();
+  ASSERT_TRUE(
+      
std::holds_alternative<std::shared_ptr<StructLike>>(second_partition_scalar));
+  auto second_partition_struct =
+      std::get<std::shared_ptr<StructLike>>(second_partition_scalar);
+  EXPECT_SCALAR_NULL(second_partition_struct->GetField(1));
+  EXPECT_SCALAR_NULL(second_partition_struct->GetField(3));
+}
+
+TEST(ArrowArrayStructLike, PrimitiveFields) {
+  auto struct_type = ::arrow::struct_(
+      {::arrow::field("id", ::arrow::int64(), /*nullable=*/false),
+       ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true),
+       ::arrow::field("score", ::arrow::float32(), /*nullable=*/true),
+       ::arrow::field("active", ::arrow::boolean(), /*nullable=*/false),
+       ::arrow::field("date", ::arrow::date32(), /*nullable=*/false),
+       ::arrow::field("time", ::arrow::time64(::arrow::TimeUnit::MICRO),
+                      /*nullable=*/false),
+       ::arrow::field("timestamp", 
::arrow::timestamp(::arrow::TimeUnit::MICRO),
+                      /*nullable=*/false),
+       ::arrow::field("fixed", ::arrow::fixed_size_binary(4), 
/*nullable=*/false),
+       ::arrow::field("decimal", ::arrow::decimal128(10, 2), 
/*nullable=*/false)});
+
+  auto arrow_array = ::arrow::json::ArrayFromJSONString(struct_type, R"([
+    {"id": 1, "name": "Alice", "score": 95.5, "active": true, "date": 
1714396800,
+     "time": 123456, "timestamp": 1714396800000000, "fixed": "aaaa", 
"decimal": "1234.56"},
+    {"id": 2, "name": "Bob", "score": null, "active": false, "date": 
1714396801,
+     "time": 123457, "timestamp": 1714396800000001, "fixed": "bbbb", 
"decimal": "-1234.56"},
+    {"id": 3, "name": null, "score": 87.2, "active": true, "date": 1714396802,
+     "time": 123458, "timestamp": 1714396800000002, "fixed": "cccc", 
"decimal": "1234.00"}])")
+                         .ValueOrDie();
+
+  ArrowSchema c_schema;
+  ArrowArray c_array;
+  internal::ArrowSchemaGuard schema_guard(&c_schema);
+  internal::ArrowArrayGuard array_guard(&c_array);
+  ASSERT_TRUE(::arrow::ExportType(*struct_type, &c_schema).ok());
+  ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+  auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array);
+  ASSERT_THAT(struct_like_result, IsOk());
+  auto struct_like = std::move(struct_like_result.value());
+
+  constexpr int64_t kNumRows = 3;
+  std::array<int64_t, kNumRows> ids = {1, 2, 3};
+  std::array<std::optional<std::string>, kNumRows> names = {"Alice", "Bob", 
std::nullopt};
+  std::array<std::optional<float>, kNumRows> scores = {95.5f, std::nullopt, 
87.2f};
+  std::array<bool, kNumRows> actives = {true, false, true};
+  std::array<int32_t, kNumRows> dates = {1714396800, 1714396801, 1714396802};
+  std::array<int64_t, kNumRows> times = {123456, 123457, 123458};
+  std::array<int64_t, kNumRows> timestamps = {1714396800000000, 
1714396800000001,
+                                              1714396800000002};
+  std::array<std::string, kNumRows> fixeds = {"aaaa", "bbbb", "cccc"};
+  std::array<std::string, kNumRows> decimals = {"1234.56", "-1234.56", 
"1234.00"};
+
+  for (int64_t i = 0; i < kNumRows; ++i) {
+    ASSERT_THAT(struct_like->Reset(i), IsOk());
+    EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]);
+    if (names[i].has_value()) {
+      EXPECT_SCALAR_EQ(struct_like->GetField(1), std::string_view, names[i]);
+    } else {
+      EXPECT_SCALAR_NULL(struct_like->GetField(1));
+    }
+    if (scores[i].has_value()) {
+      EXPECT_SCALAR_EQ(struct_like->GetField(2), float, scores[i].value());
+    } else {
+      EXPECT_SCALAR_NULL(struct_like->GetField(2));
+    }
+    EXPECT_SCALAR_EQ(struct_like->GetField(3), bool, actives[i]);
+    EXPECT_SCALAR_EQ(struct_like->GetField(4), int32_t, dates[i]);
+    EXPECT_SCALAR_EQ(struct_like->GetField(5), int64_t, times[i]);
+    EXPECT_SCALAR_EQ(struct_like->GetField(6), int64_t, timestamps[i]);
+    EXPECT_SCALAR_EQ(struct_like->GetField(7), std::string_view, fixeds[i]);
+    EXPECT_DECIMAL_EQ(struct_like->GetField(8), /*scale=*/2, decimals[i]);
+  }
+}
+
+TEST(ArrowArrayStructLike, NestedStruct) {
+  auto person_type =
+      ::arrow::struct_({::arrow::field("name", ::arrow::utf8(), 
/*nullable=*/false),
+                        ::arrow::field("age", ::arrow::int32(), 
/*nullable=*/false)});
+  auto root_type =
+      ::arrow::struct_({::arrow::field("id", ::arrow::int64(), 
/*nullable=*/false),
+                        ::arrow::field("person", person_type, 
/*nullable=*/false)});
+
+  auto arrow_array = ::arrow::json::ArrayFromJSONString(root_type, R"([
+    {"id": 1, "person": {"name": "Alice", "age": 30}},
+    {"id": 2, "person": {"name": "Bob", "age": 25}}])")
+                         .ValueOrDie();
+
+  ArrowSchema c_schema;
+  ArrowArray c_array;
+  internal::ArrowSchemaGuard schema_guard(&c_schema);
+  internal::ArrowArrayGuard array_guard(&c_array);
+  ASSERT_TRUE(::arrow::ExportType(*root_type, &c_schema).ok());
+  ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+  auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array);
+  ASSERT_THAT(struct_like_result, IsOk());
+  auto struct_like = std::move(struct_like_result.value());
+
+  constexpr int64_t kNumRows = 2;
+  std::array<int64_t, kNumRows> ids = {1, 2};
+  std::array<std::string, kNumRows> names = {"Alice", "Bob"};
+  std::array<int32_t, kNumRows> ages = {30, 25};
+
+  for (int64_t i = 0; i < kNumRows; ++i) {
+    ASSERT_THAT(struct_like->Reset(i), IsOk());
+    EXPECT_EQ(struct_like->num_fields(), 2);
+    EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]);
+
+    auto person_result = struct_like->GetField(1);
+    ASSERT_THAT(person_result, IsOk());
+    auto person_scalar = person_result.value();
+    
ASSERT_TRUE(std::holds_alternative<std::shared_ptr<StructLike>>(person_scalar));
+
+    auto person_struct = std::get<std::shared_ptr<StructLike>>(person_scalar);
+    EXPECT_EQ(person_struct->num_fields(), 2);
+    EXPECT_SCALAR_EQ(person_struct->GetField(0), std::string_view, names[i]);
+    EXPECT_SCALAR_EQ(person_struct->GetField(1), int32_t, ages[i]);
+  }
+}
+
+TEST(ArrowArrayStructLike, PrimitiveList) {
+  auto list_type =
+      ::arrow::list(::arrow::field("item", ::arrow::int32(), 
/*nullable=*/false));
+
+  auto arrow_array = ::arrow::json::ArrayFromJSONString(list_type, R"([
+    [1, 2, 3, 4, 5],
+    [10, 20],
+    []])")
+                         .ValueOrDie();
+
+  ArrowSchema c_schema;
+  ArrowArray c_array;
+  internal::ArrowSchemaGuard schema_guard(&c_schema);
+  internal::ArrowArrayGuard array_guard(&c_array);
+  ASSERT_TRUE(::arrow::ExportType(*list_type, &c_schema).ok());
+  ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+  auto array_like_result = ArrowArrayArrayLike::Make(c_schema, c_array);
+  ASSERT_THAT(array_like_result, IsOk());
+  auto array_like = std::move(array_like_result.value());
+
+  constexpr int64_t kNumRows = 3;
+  std::array<std::vector<int32_t>, kNumRows> expected_lists = {
+      std::vector<int32_t>{1, 2, 3, 4, 5},
+      std::vector<int32_t>{10, 20},
+      std::vector<int32_t>{},
+  };
+
+  for (int64_t i = 0; i < kNumRows; ++i) {
+    ASSERT_THAT(array_like->Reset(i), IsOk());
+    const auto& expected_list = expected_lists[i];
+    ASSERT_EQ(array_like->size(), expected_list.size());
+    for (size_t j = 0; j < expected_list.size(); ++j) {
+      EXPECT_SCALAR_EQ(array_like->GetElement(j), int32_t, expected_list[j]);
+    }
+  }
+}
+
+TEST(ArrowArrayStructLike, PrimitiveMap) {
+  auto map_type = std::make_shared<::arrow::MapType>(
+      ::arrow::field("key", ::arrow::utf8(), /*nullable=*/false),
+      ::arrow::field("value", ::arrow::int32(), /*nullable=*/false));
+
+  auto arrow_array = ::arrow::json::ArrayFromJSONString(map_type, R"([
+    [["Foo", 1], ["Bar", 2]],
+    [["Baz", 1]],
+    []])")
+                         .ValueOrDie();
+
+  ArrowSchema c_schema;
+  ArrowArray c_array;
+  internal::ArrowSchemaGuard schema_guard(&c_schema);
+  internal::ArrowArrayGuard array_guard(&c_array);
+  ASSERT_TRUE(::arrow::ExportType(*map_type, &c_schema).ok());
+  ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+  auto map_like_result = ArrowArrayMapLike::Make(c_schema, c_array);
+  ASSERT_THAT(map_like_result, IsOk());
+  auto map_like = std::move(map_like_result.value());
+
+  constexpr int64_t kNumRows = 3;
+  std::array<std::vector<std::pair<std::string, int32_t>>, kNumRows> 
expected_maps = {
+      std::vector<std::pair<std::string, int32_t>>{{"Foo", 1}, {"Bar", 2}},
+      std::vector<std::pair<std::string, int32_t>>{{"Baz", 1}},
+      std::vector<std::pair<std::string, int32_t>>{},
+  };
+
+  for (int64_t i = 0; i < kNumRows; ++i) {
+    ASSERT_THAT(map_like->Reset(i), IsOk());
+    const auto& expected_map = expected_maps[i];
+    ASSERT_EQ(map_like->size(), expected_map.size());
+    for (size_t j = 0; j < expected_map.size(); ++j) {
+      EXPECT_SCALAR_EQ(map_like->GetKey(j), std::string_view, 
expected_map[j].first);
+      EXPECT_SCALAR_EQ(map_like->GetValue(j), int32_t, expected_map[j].second);
+    }
+  }
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 01d1a26..41061d3 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -138,6 +138,13 @@ class Reader;
 class Writer;
 
 class StructLike;
+class ArrayLike;
+class MapLike;
+
+/// 
----------------------------------------------------------------------------
+/// TODO: Forward declarations below are not added yet.
+/// 
----------------------------------------------------------------------------
+
 class MetadataUpdate;
 class UpdateRequirement;
 class AppendFiles;

Reply via email to