This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 4d22988 feat: add row-based immutable data structure (#181)
4d22988 is described below
commit 4d229888e3bbdb071b69fb907ef04a4740afa11c
Author: Gang Wu <[email protected]>
AuthorDate: Wed Oct 8 00:49:05 2025 +0800
feat: add row-based immutable data structure (#181)
- Add StructLike, MapLike, and ArrayLike interfaces
- Add wrapper for ManifestFile and ArrowArray
---
src/iceberg/CMakeLists.txt | 3 +
src/iceberg/manifest_reader_internal.cc | 55 +++--
src/iceberg/manifest_reader_internal.h | 24 ++
src/iceberg/row/CMakeLists.txt | 18 ++
src/iceberg/row/arrow_array_wrapper.cc | 409 ++++++++++++++++++++++++++++++++
src/iceberg/row/arrow_array_wrapper.h | 114 +++++++++
src/iceberg/row/manifest_wrapper.cc | 137 +++++++++++
src/iceberg/row/manifest_wrapper.h | 100 ++++++++
src/iceberg/row/struct_like.h | 99 ++++++++
src/iceberg/test/CMakeLists.txt | 5 +-
src/iceberg/test/struct_like_test.cc | 389 ++++++++++++++++++++++++++++++
src/iceberg/type_fwd.h | 7 +
12 files changed, 1335 insertions(+), 25 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 1fd8e91..37259e2 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -31,6 +31,8 @@ set(ICEBERG_SOURCES
name_mapping.cc
partition_field.cc
partition_spec.cc
+ row/arrow_array_wrapper.cc
+ row/manifest_wrapper.cc
schema.cc
schema_field.cc
schema_internal.cc
@@ -106,6 +108,7 @@ iceberg_install_all_headers(iceberg)
add_subdirectory(catalog)
add_subdirectory(expression)
+add_subdirectory(row)
add_subdirectory(util)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_export.h
diff --git a/src/iceberg/manifest_reader_internal.cc
b/src/iceberg/manifest_reader_internal.cc
index 126bd75..feff82f 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -27,6 +27,7 @@
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -37,7 +38,7 @@ namespace iceberg {
}
#define PARSE_PRIMITIVE_FIELD(item, array_view, type)
\
- for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
auto value = ArrowArrayViewGetIntUnsafe(array_view, row_idx);
\
item = static_cast<type>(value);
\
@@ -48,7 +49,7 @@ namespace iceberg {
}
#define PARSE_STRING_FIELD(item, array_view)
\
- for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
auto value = ArrowArrayViewGetStringUnsafe(array_view, row_idx);
\
item = std::string(value.data, value.size_bytes);
\
@@ -59,7 +60,7 @@ namespace iceberg {
}
#define PARSE_BINARY_FIELD(item, array_view)
\
- for (size_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
+ for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
if (!ArrowArrayViewIsNull(view_of_column, row_idx)) {
\
item = ArrowArrayViewGetInt8Vector(array_view, row_idx);
\
} else if (required) {
\
@@ -225,66 +226,67 @@ Result<std::vector<ManifestFile>>
ParseManifestList(ArrowSchema* schema,
auto field_name = field.value()->get().name();
bool required = !field.value()->get().optional();
auto view_of_column = array_view.children[idx];
- switch (idx) {
- case 0:
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_file_field,
ManifestFileFieldFromIndex(idx));
+ switch (manifest_file_field) {
+ case ManifestFileField::kManifestPath:
PARSE_STRING_FIELD(manifest_files[row_idx].manifest_path,
view_of_column);
break;
- case 1:
+ case ManifestFileField::kManifestLength:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].manifest_length,
view_of_column,
int64_t);
break;
- case 2:
+ case ManifestFileField::kPartitionSpecId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].partition_spec_id,
view_of_column,
int32_t);
break;
- case 3:
+ case ManifestFileField::kContent:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].content, view_of_column,
ManifestFile::Content);
break;
- case 4:
+ case ManifestFileField::kSequenceNumber:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].sequence_number,
view_of_column,
int64_t);
break;
- case 5:
+ case ManifestFileField::kMinSequenceNumber:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].min_sequence_number,
view_of_column,
int64_t);
break;
- case 6:
+ case ManifestFileField::kAddedSnapshotId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_snapshot_id,
view_of_column,
int64_t);
break;
- case 7:
+ case ManifestFileField::kAddedFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_files_count,
view_of_column,
int32_t);
break;
- case 8:
+ case ManifestFileField::kExistingFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_files_count,
view_of_column, int32_t);
break;
- case 9:
+ case ManifestFileField::kDeletedFilesCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_files_count,
view_of_column,
int32_t);
break;
- case 10:
+ case ManifestFileField::kAddedRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].added_rows_count,
view_of_column,
int64_t);
break;
- case 11:
+ case ManifestFileField::kExistingRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].existing_rows_count,
view_of_column,
int64_t);
break;
- case 12:
+ case ManifestFileField::kDeletedRowsCount:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].deleted_rows_count,
view_of_column,
int64_t);
break;
- case 13:
+ case ManifestFileField::kPartitionFieldSummary:
ICEBERG_RETURN_UNEXPECTED(
ParsePartitionFieldSummaryList(view_of_column, manifest_files));
break;
- case 14:
+ case ManifestFileField::kKeyMetadata:
PARSE_BINARY_FIELD(manifest_files[row_idx].key_metadata,
view_of_column);
break;
- case 15:
+ case ManifestFileField::kFirstRowId:
PARSE_PRIMITIVE_FIELD(manifest_files[row_idx].first_row_id,
view_of_column,
int64_t);
break;
@@ -295,7 +297,7 @@ Result<std::vector<ManifestFile>>
ParseManifestList(ArrowSchema* schema,
return manifest_files;
}
-Status ParseLiteral(ArrowArrayView* view_of_partition, size_t row_idx,
+Status ParseLiteral(ArrowArrayView* view_of_partition, int64_t row_idx,
std::vector<ManifestEntry>& manifest_entries) {
if (view_of_partition->storage_type == ArrowType::NANOARROW_TYPE_BOOL) {
auto value = ArrowArrayViewGetUIntUnsafe(view_of_partition, row_idx);
@@ -355,7 +357,7 @@ Status ParseDataFile(const std::shared_ptr<StructType>&
data_file_schema,
view_of_file_field);
break;
case 2:
- for (size_t row_idx = 0; row_idx < view_of_file_field->length;
row_idx++) {
+ for (int64_t row_idx = 0; row_idx < view_of_file_field->length;
row_idx++) {
if (!ArrowArrayViewIsNull(view_of_file_field, row_idx)) {
auto value = ArrowArrayViewGetStringUnsafe(view_of_file_field,
row_idx);
std::string_view path_str(value.data, value.size_bytes);
@@ -510,7 +512,7 @@ Result<std::vector<ManifestEntry>>
ParseManifestEntry(ArrowSchema* schema,
break;
case 4: {
auto data_file_schema =
- dynamic_pointer_cast<StructType>(field.value()->get().type());
+
internal::checked_pointer_cast<StructType>(field.value()->get().type());
ICEBERG_RETURN_UNEXPECTED(
ParseDataFile(data_file_schema, view_of_column, manifest_entries));
break;
@@ -571,4 +573,11 @@ Result<std::vector<ManifestFile>>
ManifestListReaderImpl::Files() const {
return manifest_files;
}
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
+ if (index >= 0 && index <
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
+ return static_cast<ManifestFileField>(index);
+ }
+ return InvalidArgument("Invalid manifest file field index: {}", index);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/manifest_reader_internal.h
b/src/iceberg/manifest_reader_internal.h
index 3144d07..13e3d2a 100644
--- a/src/iceberg/manifest_reader_internal.h
+++ b/src/iceberg/manifest_reader_internal.h
@@ -60,4 +60,28 @@ class ManifestListReaderImpl : public ManifestListReader {
std::unique_ptr<Reader> reader_;
};
+enum class ManifestFileField : int32_t {
+ kManifestPath = 0,
+ kManifestLength = 1,
+ kPartitionSpecId = 2,
+ kContent = 3,
+ kSequenceNumber = 4,
+ kMinSequenceNumber = 5,
+ kAddedSnapshotId = 6,
+ kAddedFilesCount = 7,
+ kExistingFilesCount = 8,
+ kDeletedFilesCount = 9,
+ kAddedRowsCount = 10,
+ kExistingRowsCount = 11,
+ kDeletedRowsCount = 12,
+ kPartitionFieldSummary = 13,
+ kKeyMetadata = 14,
+ kFirstRowId = 15,
+ // kNextUnusedId is the placeholder for the next unused index.
+ // Always keep this as the last index when adding new fields.
+ kNextUnusedId = 16,
+};
+
+Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index);
+
} // namespace iceberg
diff --git a/src/iceberg/row/CMakeLists.txt b/src/iceberg/row/CMakeLists.txt
new file mode 100644
index 0000000..8deb6d2
--- /dev/null
+++ b/src/iceberg/row/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+iceberg_install_all_headers(iceberg/row)
diff --git a/src/iceberg/row/arrow_array_wrapper.cc
b/src/iceberg/row/arrow_array_wrapper.cc
new file mode 100644
index 0000000..e97293b
--- /dev/null
+++ b/src/iceberg/row/arrow_array_wrapper.cc
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/row/arrow_array_wrapper.h"
+
+#include <cstring>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+#define NANOARROW_RETURN_IF_NOT_OK(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return InvalidArrowData("Nanoarrow error: {}", error.message); \
+ }
+
+namespace {
+
+// TODO(gangwu): Reuse created ArrowArrayStructLike and others with cache.
+Result<Scalar> ExtractValue(const ArrowSchema* schema, const ArrowArray* array,
+ const ArrowArrayView* array_view, int64_t index) {
+ if (ArrowArrayViewIsNull(array_view, index)) {
+ return std::monostate{};
+ }
+
+ switch (array_view->storage_type) {
+ case NANOARROW_TYPE_BOOL:
+ return static_cast<bool>(ArrowArrayViewGetIntUnsafe(array_view, index));
+ case NANOARROW_TYPE_INT32:
+ case NANOARROW_TYPE_DATE32:
+ return static_cast<int32_t>(ArrowArrayViewGetIntUnsafe(array_view,
index));
+ case NANOARROW_TYPE_INT64:
+ case NANOARROW_TYPE_TIME64:
+ case NANOARROW_TYPE_TIMESTAMP:
+ return ArrowArrayViewGetIntUnsafe(array_view, index);
+ case NANOARROW_TYPE_FLOAT:
+ return static_cast<float>(ArrowArrayViewGetDoubleUnsafe(array_view,
index));
+ case NANOARROW_TYPE_DOUBLE:
+ return ArrowArrayViewGetDoubleUnsafe(array_view, index);
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY:
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ case NANOARROW_TYPE_STRING_VIEW:
+ case NANOARROW_TYPE_BINARY_VIEW:
+ case NANOARROW_TYPE_LARGE_STRING:
+ case NANOARROW_TYPE_LARGE_BINARY: {
+ ArrowStringView value = ArrowArrayViewGetStringUnsafe(array_view, index);
+ return std::string_view(value.data, value.size_bytes);
+ }
+ case NANOARROW_TYPE_DECIMAL128: {
+ ArrowError error;
+ ArrowSchemaView schema_view;
+ NANOARROW_RETURN_IF_NOT_OK(ArrowSchemaViewInit(&schema_view, schema,
&error));
+ ArrowDecimal value;
+ ArrowDecimalInit(&value, schema_view.decimal_bitwidth,
+ schema_view.decimal_precision,
schema_view.decimal_scale);
+ ArrowArrayViewGetDecimalUnsafe(array_view, index, &value);
+ if (value.n_words != 2) {
+ return InvalidArrowData("Unsupported Arrow decimal words: {}",
value.n_words);
+ }
+ int128_t int_value{0};
+ std::memcpy(&int_value, value.words, sizeof(int128_t));
+ return Decimal(int_value);
+ }
+ case NANOARROW_TYPE_STRUCT: {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructLike> struct_like,
+ ArrowArrayStructLike::Make(*schema, *array,
index));
+ return struct_like;
+ }
+ case NANOARROW_TYPE_LIST: {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<ArrayLike> array_like,
+ ArrowArrayArrayLike::Make(*schema, *array,
index));
+ return array_like;
+ }
+ case NANOARROW_TYPE_MAP: {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<MapLike> map_like,
+ ArrowArrayMapLike::Make(*schema, *array, index));
+ return map_like;
+ }
+ case NANOARROW_TYPE_EXTENSION:
+ // TODO(gangwu): Handle these types properly
+ default:
+ return NotImplemented("Unsupported Arrow type: {}",
+ static_cast<int>(array_view->storage_type));
+ }
+}
+
+} // namespace
+
+// ArrowArrayStructLike Implementation
+
+class ArrowArrayStructLike::Impl {
+ public:
+ Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+ : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+ ~Impl() = default;
+
+ Result<Scalar> GetField(size_t pos) const {
+ // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+ if (pos >= static_cast<size_t>(schema_.n_children)) {
+ return InvalidArgument("Field index {} out of range (size: {})", pos,
+ schema_.n_children);
+ }
+
+ if (row_index_ < 0 || row_index_ >= array_.get().length) {
+ return InvalidArgument("Row index {} out of range (length: {})",
row_index_,
+ array_.get().length);
+ }
+
+ const ArrowSchema* child_schema = schema_.children[pos];
+ const ArrowArray* child_array = array_.get().children[pos];
+ const ArrowArrayView* child_view = array_view_.children[pos];
+
+ return ExtractValue(child_schema, child_array, child_view, row_index_);
+ }
+
+ size_t num_fields() const { return static_cast<size_t>(schema_.n_children); }
+
+ Status Reset(const ArrowArray& array, int64_t row_index) {
+ array_ = std::cref(array);
+ row_index_ = row_index;
+
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return {};
+ }
+
+ Status Reset(int64_t row_index) {
+ row_index_ = row_index;
+ return {};
+ }
+
+ Status Init() {
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return {};
+ }
+
+ private:
+ ArrowArrayView array_view_;
+ internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+ const ArrowSchema& schema_;
+ std::reference_wrapper<const ArrowArray> array_;
+ int64_t row_index_;
+};
+
+Result<std::unique_ptr<ArrowArrayStructLike>> ArrowArrayStructLike::Make(
+ const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+ auto impl = std::make_unique<Impl>(schema, array, row_index);
+ ICEBERG_RETURN_UNEXPECTED(impl->Init());
+ return std::unique_ptr<ArrowArrayStructLike>(new
ArrowArrayStructLike(std::move(impl)));
+}
+
+ArrowArrayStructLike::ArrowArrayStructLike(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
+ArrowArrayStructLike::~ArrowArrayStructLike() = default;
+
+Result<Scalar> ArrowArrayStructLike::GetField(size_t pos) const {
+ return impl_->GetField(pos);
+}
+
+size_t ArrowArrayStructLike::num_fields() const { return impl_->num_fields(); }
+
+Status ArrowArrayStructLike::Reset(int64_t row_index) { return
impl_->Reset(row_index); }
+
+Status ArrowArrayStructLike::Reset(const ArrowArray& array, int64_t row_index)
{
+ return impl_->Reset(array, row_index);
+}
+
+// ArrowArrayArrayLike Implementation
+
+class ArrowArrayArrayLike::Impl {
+ public:
+ Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+ : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+ ~Impl() = default;
+
+ Result<Scalar> GetElement(size_t pos) const {
+ // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+ if (pos >= static_cast<size_t>(length_)) {
+ return InvalidArgument("Element index {} out of range (length: {})",
pos, length_);
+ }
+
+ const ArrowSchema* child_schema = schema_.children[0];
+ const ArrowArray* child_array = array_.get().children[0];
+ const ArrowArrayView* child_view = array_view_.children[0];
+
+ return ExtractValue(child_schema, child_array, child_view,
+ offset_ + static_cast<int64_t>(pos));
+ }
+
+ size_t size() const { return static_cast<size_t>(length_); }
+
+ Status Reset(const ArrowArray& array, int64_t row_index) {
+ array_ = std::cref(array);
+ row_index_ = row_index;
+
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return UpdateOffsets();
+ }
+
+ Status Reset(int64_t row_index) {
+ row_index_ = row_index;
+ return UpdateOffsets();
+ }
+
+ Status Init() {
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return UpdateOffsets();
+ }
+
+ private:
+ Status UpdateOffsets() {
+ if (row_index_ < 0 || row_index_ >= array_.get().length) {
+ return InvalidArgument("Row index {} out of range (length: {})",
row_index_,
+ array_.get().length);
+ }
+
+ offset_ = ArrowArrayViewListChildOffset(&array_view_, row_index_);
+ length_ = ArrowArrayViewListChildOffset(&array_view_, row_index_ + 1) -
offset_;
+ return {};
+ }
+
+ ArrowArrayView array_view_;
+ internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+ const ArrowSchema& schema_;
+ std::reference_wrapper<const ArrowArray> array_;
+ int64_t row_index_;
+
+ int64_t offset_ = 0;
+ int64_t length_ = 0;
+};
+
+Result<std::unique_ptr<ArrowArrayArrayLike>> ArrowArrayArrayLike::Make(
+ const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+ auto impl = std::make_unique<Impl>(schema, array, row_index);
+ ICEBERG_RETURN_UNEXPECTED(impl->Init());
+ return std::unique_ptr<ArrowArrayArrayLike>(new
ArrowArrayArrayLike(std::move(impl)));
+}
+
+ArrowArrayArrayLike::ArrowArrayArrayLike(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
+ArrowArrayArrayLike::~ArrowArrayArrayLike() = default;
+
+Result<Scalar> ArrowArrayArrayLike::GetElement(size_t pos) const {
+ return impl_->GetElement(pos);
+}
+
+size_t ArrowArrayArrayLike::size() const { return impl_->size(); }
+
+Status ArrowArrayArrayLike::Reset(int64_t row_index) { return
impl_->Reset(row_index); }
+
+Status ArrowArrayArrayLike::Reset(const ArrowArray& array, int64_t row_index) {
+ return impl_->Reset(array, row_index);
+}
+
+// ArrowArrayMapLike Implementation
+
+class ArrowArrayMapLike::Impl {
+ public:
+ Impl(const ArrowSchema& schema, const ArrowArray& array, int64_t row_index)
+ : schema_(schema), array_(std::cref(array)), row_index_(row_index) {}
+
+ ~Impl() = default;
+
+ Result<Scalar> GetKey(size_t pos) const {
+ // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+ if (pos >= static_cast<size_t>(length_)) {
+ return InvalidArgument("Key index {} out of range (length: {})", pos,
length_);
+ }
+
+ const ArrowSchema* keys_schema = schema_.children[0]->children[0];
+ const ArrowArray* keys_array = array_.get().children[0]->children[0];
+ const ArrowArrayView* keys_view = array_view_.children[0]->children[0];
+
+ return ExtractValue(keys_schema, keys_array, keys_view,
+ offset_ + static_cast<int64_t>(pos));
+ }
+
+ Result<Scalar> GetValue(size_t pos) const {
+ // NOLINTNEXTLINE(modernize-use-integer-sign-comparison)
+ if (pos >= static_cast<size_t>(length_)) {
+ return InvalidArgument("Value index {} out of range (length: {})", pos,
length_);
+ }
+
+ const ArrowSchema* values_schema = schema_.children[0]->children[1];
+ const ArrowArray* values_array = array_.get().children[0]->children[1];
+ const ArrowArrayView* values_view = array_view_.children[0]->children[1];
+
+ return ExtractValue(values_schema, values_array, values_view,
+ offset_ + static_cast<int64_t>(pos));
+ }
+
+ size_t size() const { return static_cast<size_t>(length_); }
+
+ Status Reset(const ArrowArray& array, int64_t row_index) {
+ array_ = std::cref(array);
+ row_index_ = row_index;
+
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return UpdateOffsets();
+ }
+
+ Status Reset(int64_t row_index) {
+ row_index_ = row_index;
+ return UpdateOffsets();
+ }
+
+ Status Init() {
+ ArrowError error;
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewInitFromSchema(&array_view_, &schema_, &error));
+ NANOARROW_RETURN_IF_NOT_OK(
+ ArrowArrayViewSetArray(&array_view_, &array_.get(), &error));
+ return UpdateOffsets();
+ }
+
+ private:
+ Status UpdateOffsets() {
+ if (row_index_ < 0 || row_index_ >= array_.get().length) {
+ return InvalidArgument("Row index {} out of range (length: {})",
row_index_,
+ array_.get().length);
+ }
+
+ // XXX: ArrowArrayViewListChildOffset does not work for map types.
+ // We need to directly access the offsets buffer instead.
+ auto* offsets_buffer = array_view_.buffer_views[1].data.as_int32;
+ offset_ = offsets_buffer[row_index_];
+ length_ = offsets_buffer[row_index_ + 1] - offset_;
+
+ return {};
+ }
+
+ ArrowArrayView array_view_;
+ internal::ArrowArrayViewGuard array_view_guard_{&array_view_};
+
+ const ArrowSchema& schema_;
+ std::reference_wrapper<const ArrowArray> array_;
+ int64_t row_index_;
+
+ int64_t offset_ = 0;
+ int64_t length_ = 0;
+};
+
+Result<std::unique_ptr<ArrowArrayMapLike>> ArrowArrayMapLike::Make(
+ const ArrowSchema& schema, const ArrowArray& array, int64_t row_index) {
+ auto impl = std::make_unique<Impl>(schema, array, row_index);
+ ICEBERG_RETURN_UNEXPECTED(impl->Init());
+ return std::unique_ptr<ArrowArrayMapLike>(new
ArrowArrayMapLike(std::move(impl)));
+}
+
+ArrowArrayMapLike::ArrowArrayMapLike(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
+ArrowArrayMapLike::~ArrowArrayMapLike() = default;
+
+Result<Scalar> ArrowArrayMapLike::GetKey(size_t pos) const { return
impl_->GetKey(pos); }
+
+Result<Scalar> ArrowArrayMapLike::GetValue(size_t pos) const {
+ return impl_->GetValue(pos);
+}
+
+size_t ArrowArrayMapLike::size() const { return impl_->size(); }
+
+Status ArrowArrayMapLike::Reset(int64_t row_index) { return
impl_->Reset(row_index); }
+
+Status ArrowArrayMapLike::Reset(const ArrowArray& array, int64_t row_index) {
+ return impl_->Reset(array, row_index);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/row/arrow_array_wrapper.h
b/src/iceberg/row/arrow_array_wrapper.h
new file mode 100644
index 0000000..7316bad
--- /dev/null
+++ b/src/iceberg/row/arrow_array_wrapper.h
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/row/arrow_array_wrapper.h
+/// Wrapper classes for ArrowArray that implement StructLike, ArrayLike, and
MapLike
+/// interfaces for unified row-oriented data access from columnar ArrowArray
data.
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/row/struct_like.h"
+
+namespace iceberg {
+
+/// \brief Wrapper for one row of a struct-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayStructLike : public StructLike {
+ public:
+ ~ArrowArrayStructLike() override;
+
+ Result<Scalar> GetField(size_t pos) const override;
+
+ size_t num_fields() const override;
+
+ Status Reset(int64_t row_index);
+
+ Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+ static Result<std::unique_ptr<ArrowArrayStructLike>> Make(const ArrowSchema&
schema,
+ const ArrowArray&
array,
+ int64_t row_index
= 0);
+
+ ArrowArrayStructLike(const ArrowArrayStructLike&) = delete;
+ ArrowArrayStructLike& operator=(const ArrowArrayStructLike&) = delete;
+
+ private:
+ class Impl;
+ explicit ArrowArrayStructLike(std::unique_ptr<Impl> impl);
+
+ std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Wrapper for one row of a list-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayArrayLike : public ArrayLike {
+ public:
+ ~ArrowArrayArrayLike() override;
+
+ Result<Scalar> GetElement(size_t pos) const override;
+
+ size_t size() const override;
+
+ Status Reset(int64_t row_index);
+
+ Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+ static Result<std::unique_ptr<ArrowArrayArrayLike>> Make(const ArrowSchema&
schema,
+ const ArrowArray&
array,
+ int64_t row_index =
0);
+
+ ArrowArrayArrayLike(const ArrowArrayArrayLike& other) = delete;
+ ArrowArrayArrayLike& operator=(const ArrowArrayArrayLike& other) = delete;
+
+ private:
+ class Impl;
+ explicit ArrowArrayArrayLike(std::unique_ptr<Impl> impl);
+
+ std::unique_ptr<Impl> impl_;
+};
+
+/// \brief Wrapper for one row of a map-typed ArrowArray.
+class ICEBERG_EXPORT ArrowArrayMapLike : public MapLike {
+ public:
+ ~ArrowArrayMapLike() override;
+
+ Result<Scalar> GetKey(size_t pos) const override;
+
+ Result<Scalar> GetValue(size_t pos) const override;
+
+ size_t size() const override;
+
+ Status Reset(int64_t row_index);
+
+ Status Reset(const ArrowArray& array, int64_t row_index = 0);
+
+ static Result<std::unique_ptr<ArrowArrayMapLike>> Make(const ArrowSchema&
schema,
+ const ArrowArray&
array,
+ int64_t row_index =
0);
+
+ ArrowArrayMapLike(const ArrowArrayMapLike& other) = delete;
+ ArrowArrayMapLike& operator=(const ArrowArrayMapLike& other) = delete;
+
+ private:
+ class Impl;
+ explicit ArrowArrayMapLike(std::unique_ptr<Impl> impl);
+
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.cc
b/src/iceberg/row/manifest_wrapper.cc
new file mode 100644
index 0000000..4c8708e
--- /dev/null
+++ b/src/iceberg/row/manifest_wrapper.cc
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/row/manifest_wrapper.h"
+
+#include "iceberg/manifest_reader_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+template <typename T>
+ requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T,
std::string>
+std::string_view ToView(const T& value) {
+ return {reinterpret_cast<const char*>(value.data()), value.size()}; //
NOLINT
+}
+
+template <typename T>
+Result<Scalar> FromOptional(const std::optional<T>& value) {
+ if (value.has_value()) {
+ return value.value();
+ }
+ return std::monostate{};
+}
+
+} // namespace
+
+Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
+ if (pos >= num_fields()) {
+ return InvalidArgument("Invalid partition field summary index: {}", pos);
+ }
+ switch (pos) {
+ case 0:
+ return summary_.get().contains_null;
+ case 1:
+ return FromOptional(summary_.get().contains_nan);
+ case 2:
+ return FromOptional(
+ summary_.get().lower_bound.transform(ToView<std::vector<uint8_t>>));
+ case 3:
+ return FromOptional(
+ summary_.get().upper_bound.transform(ToView<std::vector<uint8_t>>));
+ default:
+ return InvalidArgument("Invalid partition field summary index: {}", pos);
+ }
+}
+
+Result<Scalar> PartitionFieldSummaryArrayLike::GetElement(size_t pos) const {
+ if (pos >= size()) {
+ return InvalidArgument("Invalid partition field summary index: {}", pos);
+ }
+ if (summary_ == nullptr) {
+ summary_ =
std::make_shared<PartitionFieldSummaryStructLike>(summaries_.get()[pos]);
+ } else {
+ summary_->Reset(summaries_.get()[pos]);
+ }
+ return summary_;
+}
+
+Result<Scalar> ManifestFileStructLike::GetField(size_t pos) const {
+ if (pos >= num_fields()) {
+ return InvalidArgument("Invalid manifest file field index: {}", pos);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto field,
+
ManifestFileFieldFromIndex(static_cast<int32_t>(pos)));
+ const auto& manifest_file = manifest_file_.get();
+ switch (field) {
+ case ManifestFileField::kManifestPath:
+ return ToView(manifest_file.manifest_path);
+ case ManifestFileField::kManifestLength:
+ return manifest_file.manifest_length;
+ case ManifestFileField::kPartitionSpecId:
+ return manifest_file.partition_spec_id;
+ case ManifestFileField::kContent:
+ return static_cast<int32_t>(manifest_file.content);
+ case ManifestFileField::kSequenceNumber:
+ return manifest_file.sequence_number;
+ case ManifestFileField::kMinSequenceNumber:
+ return manifest_file.min_sequence_number;
+ case ManifestFileField::kAddedSnapshotId:
+ return manifest_file.added_snapshot_id;
+ case ManifestFileField::kAddedFilesCount:
+ return FromOptional(manifest_file.added_files_count);
+ case ManifestFileField::kExistingFilesCount:
+ return FromOptional(manifest_file.existing_files_count);
+ case ManifestFileField::kDeletedFilesCount:
+ return FromOptional(manifest_file.deleted_files_count);
+ case ManifestFileField::kAddedRowsCount:
+ return FromOptional(manifest_file.added_rows_count);
+ case ManifestFileField::kExistingRowsCount:
+ return FromOptional(manifest_file.existing_rows_count);
+ case ManifestFileField::kDeletedRowsCount:
+ return FromOptional(manifest_file.deleted_rows_count);
+ case ManifestFileField::kPartitionFieldSummary: {
+ if (summaries_ == nullptr) {
+ summaries_ =
+
std::make_shared<PartitionFieldSummaryArrayLike>(manifest_file.partitions);
+ } else {
+ summaries_->Reset(manifest_file.partitions);
+ }
+ return summaries_;
+ }
+ case ManifestFileField::kKeyMetadata:
+ return ToView(manifest_file.key_metadata);
+ case ManifestFileField::kFirstRowId:
+ return FromOptional(manifest_file.first_row_id);
+ case ManifestFileField::kNextUnusedId:
+ return InvalidArgument("Invalid manifest file field index: {}", pos);
+ }
+ return InvalidArgument("Invalid manifest file field index: {}", pos);
+}
+
+size_t ManifestFileStructLike::num_fields() const {
+ return static_cast<size_t>(ManifestFileField::kNextUnusedId);
+}
+
+std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
+ return std::make_unique<ManifestFileStructLike>(file);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/row/manifest_wrapper.h
b/src/iceberg/row/manifest_wrapper.h
new file mode 100644
index 0000000..3271eac
--- /dev/null
+++ b/src/iceberg/row/manifest_wrapper.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest_wrapper.h
+/// Wrapper classes for manifest-related data structures that implement
+/// StructLike, ArrayLike, and MapLike interfaces for unified data access.
+
+#include <functional>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/row/struct_like.h"
+
+namespace iceberg {
+
+/// \brief StructLike wrapper for PartitionFieldSummary.
+class ICEBERG_EXPORT PartitionFieldSummaryStructLike : public StructLike {
+ public:
+ explicit PartitionFieldSummaryStructLike(const PartitionFieldSummary&
summary)
+ : summary_(summary) {}
+ ~PartitionFieldSummaryStructLike() override = default;
+
+ PartitionFieldSummaryStructLike(const PartitionFieldSummaryStructLike&) =
delete;
+ PartitionFieldSummaryStructLike& operator=(const
PartitionFieldSummaryStructLike&) =
+ delete;
+
+ Result<Scalar> GetField(size_t pos) const override;
+
+ size_t num_fields() const override { return 4; }
+
+ void Reset(const PartitionFieldSummary& summary) { summary_ =
std::cref(summary); }
+
+ private:
+ std::reference_wrapper<const PartitionFieldSummary> summary_;
+};
+
+/// \brief ArrayLike wrapper for a vector of PartitionFieldSummary.
+class ICEBERG_EXPORT PartitionFieldSummaryArrayLike : public ArrayLike {
+ public:
+ explicit PartitionFieldSummaryArrayLike(
+ const std::vector<PartitionFieldSummary>& summaries)
+ : summaries_(summaries) {}
+ ~PartitionFieldSummaryArrayLike() override = default;
+
+ PartitionFieldSummaryArrayLike(const PartitionFieldSummaryArrayLike&) =
delete;
+ PartitionFieldSummaryArrayLike& operator=(const
PartitionFieldSummaryArrayLike&) =
+ delete;
+
+ Result<Scalar> GetElement(size_t pos) const override;
+
+ size_t size() const override { return summaries_.get().size(); }
+
+ void Reset(const std::vector<PartitionFieldSummary>& summaries) {
+ summaries_ = std::cref(summaries);
+ }
+
+ private:
+ std::reference_wrapper<const std::vector<PartitionFieldSummary>> summaries_;
+ mutable std::shared_ptr<PartitionFieldSummaryStructLike> summary_;
+};
+
+/// \brief StructLike wrapper for ManifestFile.
+class ICEBERG_EXPORT ManifestFileStructLike : public StructLike {
+ public:
+ explicit ManifestFileStructLike(const ManifestFile& file) :
manifest_file_(file) {}
+ ~ManifestFileStructLike() override = default;
+
+ ManifestFileStructLike(const ManifestFileStructLike&) = delete;
+ ManifestFileStructLike& operator=(const ManifestFileStructLike&) = delete;
+
+ Result<Scalar> GetField(size_t pos) const override;
+
+ size_t num_fields() const override;
+
+ void Reset(const ManifestFile& file) { manifest_file_ = std::cref(file); }
+
+ private:
+ std::reference_wrapper<const ManifestFile> manifest_file_;
+ mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/row/struct_like.h b/src/iceberg/row/struct_like.h
new file mode 100644
index 0000000..3093f75
--- /dev/null
+++ b/src/iceberg/row/struct_like.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/row/struct_like.h
+/// Structures for viewing data in a row-based format. This header contains
the
+/// definition of StructLike, ArrayLike, and MapLike which provide an unified
+/// interface for viewing data from ArrowArray or structs like ManifestFile and
+/// ManifestEntry. Note that they do not carry type information and should be
+/// used in conjunction with the schema to get the type information.
+
+#include <memory>
+#include <string_view>
+#include <variant>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/decimal.h"
+
+namespace iceberg {
+
+/// \brief A scalar value depending on its data type.
+///
+/// Note that all string and binary values are stored as non-owned string_view
+/// and their lifetime is managed by the wrapped object.
+using Scalar = std::variant<std::monostate, // for null
+ bool, // for boolean
+ int32_t, // for int, date
+ int64_t, // for long, timestamp, timestamp_tz,
and time
+ float, // for float
+ double, // for double
+ std::string_view, // for non-owned string, binary
and fixed
+ Decimal, // for decimal
+ std::shared_ptr<StructLike>, // for struct
+ std::shared_ptr<ArrayLike>, // for list
+ std::shared_ptr<MapLike>>; // for map
+
+/// \brief An immutable struct-like wrapper.
+class ICEBERG_EXPORT StructLike {
+ public:
+ virtual ~StructLike() = default;
+
+ /// \brief Get the field value at the given position.
+ /// \param pos The position of the field in the struct.
+ virtual Result<Scalar> GetField(size_t pos) const = 0;
+
+ /// \brief Get the number of fields in the struct.
+ virtual size_t num_fields() const = 0;
+};
+
+/// \brief An immutable array-like wrapper.
+class ICEBERG_EXPORT ArrayLike {
+ public:
+ virtual ~ArrayLike() = default;
+
+ /// \brief Get the array element at the given position.
+ /// \param pos The position of the element in the array.
+ virtual Result<Scalar> GetElement(size_t pos) const = 0;
+
+ /// \brief Get the number of elements in the array.
+ virtual size_t size() const = 0;
+};
+
+/// \brief An immutable map-like wrapper.
+class ICEBERG_EXPORT MapLike {
+ public:
+ virtual ~MapLike() = default;
+
+ /// \brief Get the key at the given position.
+ /// \param pos The position of the key in the map.
+ virtual Result<Scalar> GetKey(size_t pos) const = 0;
+
+ /// \brief Get the value at the given position.
+ /// \param pos The position of the value in the map.
+ virtual Result<Scalar> GetValue(size_t pos) const = 0;
+
+ /// \brief Get the number of entries in the map.
+ virtual size_t size() const = 0;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index cb3b608..db9caab 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -110,10 +110,11 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(arrow_test
USE_BUNDLE
SOURCES
- arrow_test.cc
arrow_fs_file_io_test.cc
+ arrow_test.cc
+ gzip_decompress_test.cc
metadata_io_test.cc
- gzip_decompress_test.cc)
+ struct_like_test.cc)
add_iceberg_test(catalog_test
USE_BUNDLE
diff --git a/src/iceberg/test/struct_like_test.cc
b/src/iceberg/test/struct_like_test.cc
new file mode 100644
index 0000000..484b254
--- /dev/null
+++ b/src/iceberg/test/struct_like_test.cc
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader_internal.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/row/manifest_wrapper.h"
+#include "iceberg/schema_internal.h"
+#include "matchers.h"
+
+namespace iceberg {
+
+#define EXPECT_SCALAR_EQ(result, expected_type, expected_value) \
+ do { \
+ ASSERT_THAT(result, IsOk()); \
+ auto scalar = result.value(); \
+ ASSERT_TRUE(std::holds_alternative<expected_type>(scalar)); \
+ EXPECT_EQ(std::get<expected_type>(scalar), expected_value); \
+ } while (0)
+
+#define EXPECT_DECIMAL_EQ(result, scale, expected_value) \
+ do { \
+ ASSERT_THAT(result, IsOk()); \
+ auto scalar = result.value(); \
+ ASSERT_TRUE(std::holds_alternative<Decimal>(scalar)); \
+ auto decimal = std::get<Decimal>(scalar); \
+ EXPECT_EQ(decimal.ToString(scale), expected_value); \
+ } while (0)
+
+#define EXPECT_SCALAR_NULL(result) \
+ do { \
+ ASSERT_THAT(result, IsOk()); \
+ auto scalar = result.value(); \
+ ASSERT_TRUE(std::holds_alternative<std::monostate>(scalar)); \
+ } while (0)
+
+TEST(ManifestFileStructLike, BasicFields) {
+ ManifestFile manifest_file{
+ .manifest_path = "/path/to/manifest.avro",
+ .manifest_length = 12345,
+ .partition_spec_id = 1,
+ .content = ManifestFile::Content::kData,
+ .sequence_number = 100,
+ .min_sequence_number = 90,
+ .added_snapshot_id = 1001,
+ .added_files_count = 10,
+ .existing_files_count = 5,
+ .deleted_files_count = 2,
+ .added_rows_count = 1000,
+ .existing_rows_count = 500,
+ .deleted_rows_count = 20,
+ };
+
+ ManifestFileStructLike struct_like(manifest_file);
+ EXPECT_EQ(struct_like.num_fields(), 16);
+
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kManifestPath)),
+ std::string_view, "/path/to/manifest.avro");
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kManifestLength)),
+ int64_t, 12345);
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kPartitionSpecId)),
+ int32_t, 1);
+
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
+ int32_t,
static_cast<int32_t>(ManifestFile::Content::kData));
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kSequenceNumber)),
+ int64_t, 100);
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kAddedFilesCount)),
+ int32_t, 10);
+ EXPECT_THAT(struct_like.GetField(100), IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST(ManifestFileStructLike, OptionalFields) {
+ ManifestFile manifest_file{.manifest_path = "/path/to/manifest2.avro",
+ .manifest_length = 54321,
+ .partition_spec_id = 2,
+ .content = ManifestFile::Content::kDeletes,
+ .sequence_number = 200,
+ .min_sequence_number = 180,
+ .added_snapshot_id = 2001,
+ .added_files_count = std::nullopt, // null
optional field
+ .existing_files_count = 15,
+ .deleted_files_count = std::nullopt, // null
optional field
+ .added_rows_count = std::nullopt, // null
optional field
+ .existing_rows_count = 1500,
+ .deleted_rows_count = 200,
+ .partitions = {},
+ .key_metadata = {},
+ .first_row_id = 12345};
+ ManifestFileStructLike struct_like(manifest_file);
+
+ EXPECT_SCALAR_NULL(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kAddedFilesCount)));
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kExistingFilesCount)),
+ int32_t, 15);
+ EXPECT_SCALAR_EQ(
+
struct_like.GetField(static_cast<size_t>(ManifestFileField::kFirstRowId)),
int64_t,
+ 12345);
+
EXPECT_SCALAR_EQ(struct_like.GetField(static_cast<size_t>(ManifestFileField::kContent)),
+ int32_t,
static_cast<int32_t>(ManifestFile::Content::kDeletes));
+}
+
+TEST(ManifestFileStructLike, WithPartitions) {
+ ManifestFile manifest_file{
+ .manifest_path = "/path/to/manifest3.avro",
+ .manifest_length = 98765,
+ .partition_spec_id = 3,
+ .content = ManifestFile::Content::kData,
+ .sequence_number = 300,
+ .min_sequence_number = 290,
+ .added_snapshot_id = 3001,
+ .added_files_count = 20,
+ .existing_files_count = 10,
+ .deleted_files_count = 1,
+ .added_rows_count = 2000,
+ .existing_rows_count = 1000,
+ .deleted_rows_count = 10,
+ .partitions = {{.contains_null = true,
+ .contains_nan = false,
+ .lower_bound = std::vector<uint8_t>{0x01, 0x02, 0x03},
+ .upper_bound = std::vector<uint8_t>{0x04, 0x05, 0x06}},
+ {.contains_null = false,
+ .contains_nan = std::nullopt,
+ .lower_bound = std::vector<uint8_t>{0x10, 0x20},
+ .upper_bound = std::nullopt}}};
+
+ ManifestFileStructLike struct_like(manifest_file);
+
+ auto partitions_result = struct_like.GetField(
+ static_cast<size_t>(ManifestFileField::kPartitionFieldSummary));
+ ASSERT_THAT(partitions_result, IsOk());
+ auto partitions_scalar = partitions_result.value();
+
ASSERT_TRUE(std::holds_alternative<std::shared_ptr<ArrayLike>>(partitions_scalar));
+ auto partitions_array =
std::get<std::shared_ptr<ArrayLike>>(partitions_scalar);
+ EXPECT_EQ(partitions_array->size(), 2);
+
+ // Test 1st partition summary
+ auto first_partition_result = partitions_array->GetElement(0);
+ ASSERT_THAT(first_partition_result, IsOk());
+ auto first_partition_scalar = first_partition_result.value();
+ ASSERT_TRUE(
+
std::holds_alternative<std::shared_ptr<StructLike>>(first_partition_scalar));
+ auto first_partition_struct =
+ std::get<std::shared_ptr<StructLike>>(first_partition_scalar);
+ EXPECT_EQ(first_partition_struct->num_fields(), 4);
+ EXPECT_SCALAR_EQ(first_partition_struct->GetField(0), bool, true);
+ EXPECT_SCALAR_EQ(first_partition_struct->GetField(1), bool, false);
+ auto lower_bound_result = first_partition_struct->GetField(2);
+ ASSERT_THAT(lower_bound_result, IsOk());
+ auto lower_bound_scalar = lower_bound_result.value();
+ ASSERT_TRUE(std::holds_alternative<std::string_view>(lower_bound_scalar));
+ auto lower_bound_view = std::get<std::string_view>(lower_bound_scalar);
+ EXPECT_EQ(lower_bound_view.size(), 3);
+ EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[0]), 0x01);
+ EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[1]), 0x02);
+ EXPECT_EQ(static_cast<uint8_t>(lower_bound_view[2]), 0x03);
+
+ // Test 2nd partition summary with null fields
+ auto second_partition_result = partitions_array->GetElement(1);
+ ASSERT_THAT(second_partition_result, IsOk());
+ auto second_partition_scalar = second_partition_result.value();
+ ASSERT_TRUE(
+
std::holds_alternative<std::shared_ptr<StructLike>>(second_partition_scalar));
+ auto second_partition_struct =
+ std::get<std::shared_ptr<StructLike>>(second_partition_scalar);
+ EXPECT_SCALAR_NULL(second_partition_struct->GetField(1));
+ EXPECT_SCALAR_NULL(second_partition_struct->GetField(3));
+}
+
+TEST(ArrowArrayStructLike, PrimitiveFields) {
+ auto struct_type = ::arrow::struct_(
+ {::arrow::field("id", ::arrow::int64(), /*nullable=*/false),
+ ::arrow::field("name", ::arrow::utf8(), /*nullable=*/true),
+ ::arrow::field("score", ::arrow::float32(), /*nullable=*/true),
+ ::arrow::field("active", ::arrow::boolean(), /*nullable=*/false),
+ ::arrow::field("date", ::arrow::date32(), /*nullable=*/false),
+ ::arrow::field("time", ::arrow::time64(::arrow::TimeUnit::MICRO),
+ /*nullable=*/false),
+ ::arrow::field("timestamp",
::arrow::timestamp(::arrow::TimeUnit::MICRO),
+ /*nullable=*/false),
+ ::arrow::field("fixed", ::arrow::fixed_size_binary(4),
/*nullable=*/false),
+ ::arrow::field("decimal", ::arrow::decimal128(10, 2),
/*nullable=*/false)});
+
+ auto arrow_array = ::arrow::json::ArrayFromJSONString(struct_type, R"([
+ {"id": 1, "name": "Alice", "score": 95.5, "active": true, "date":
1714396800,
+ "time": 123456, "timestamp": 1714396800000000, "fixed": "aaaa",
"decimal": "1234.56"},
+ {"id": 2, "name": "Bob", "score": null, "active": false, "date":
1714396801,
+ "time": 123457, "timestamp": 1714396800000001, "fixed": "bbbb",
"decimal": "-1234.56"},
+ {"id": 3, "name": null, "score": 87.2, "active": true, "date": 1714396802,
+ "time": 123458, "timestamp": 1714396800000002, "fixed": "cccc",
"decimal": "1234.00"}])")
+ .ValueOrDie();
+
+ ArrowSchema c_schema;
+ ArrowArray c_array;
+ internal::ArrowSchemaGuard schema_guard(&c_schema);
+ internal::ArrowArrayGuard array_guard(&c_array);
+ ASSERT_TRUE(::arrow::ExportType(*struct_type, &c_schema).ok());
+ ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+ auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array);
+ ASSERT_THAT(struct_like_result, IsOk());
+ auto struct_like = std::move(struct_like_result.value());
+
+ constexpr int64_t kNumRows = 3;
+ std::array<int64_t, kNumRows> ids = {1, 2, 3};
+ std::array<std::optional<std::string>, kNumRows> names = {"Alice", "Bob",
std::nullopt};
+ std::array<std::optional<float>, kNumRows> scores = {95.5f, std::nullopt,
87.2f};
+ std::array<bool, kNumRows> actives = {true, false, true};
+ std::array<int32_t, kNumRows> dates = {1714396800, 1714396801, 1714396802};
+ std::array<int64_t, kNumRows> times = {123456, 123457, 123458};
+ std::array<int64_t, kNumRows> timestamps = {1714396800000000,
1714396800000001,
+ 1714396800000002};
+ std::array<std::string, kNumRows> fixeds = {"aaaa", "bbbb", "cccc"};
+ std::array<std::string, kNumRows> decimals = {"1234.56", "-1234.56",
"1234.00"};
+
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ ASSERT_THAT(struct_like->Reset(i), IsOk());
+ EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]);
+ if (names[i].has_value()) {
+ EXPECT_SCALAR_EQ(struct_like->GetField(1), std::string_view, names[i]);
+ } else {
+ EXPECT_SCALAR_NULL(struct_like->GetField(1));
+ }
+ if (scores[i].has_value()) {
+ EXPECT_SCALAR_EQ(struct_like->GetField(2), float, scores[i].value());
+ } else {
+ EXPECT_SCALAR_NULL(struct_like->GetField(2));
+ }
+ EXPECT_SCALAR_EQ(struct_like->GetField(3), bool, actives[i]);
+ EXPECT_SCALAR_EQ(struct_like->GetField(4), int32_t, dates[i]);
+ EXPECT_SCALAR_EQ(struct_like->GetField(5), int64_t, times[i]);
+ EXPECT_SCALAR_EQ(struct_like->GetField(6), int64_t, timestamps[i]);
+ EXPECT_SCALAR_EQ(struct_like->GetField(7), std::string_view, fixeds[i]);
+ EXPECT_DECIMAL_EQ(struct_like->GetField(8), /*scale=*/2, decimals[i]);
+ }
+}
+
+TEST(ArrowArrayStructLike, NestedStruct) {
+ auto person_type =
+ ::arrow::struct_({::arrow::field("name", ::arrow::utf8(),
/*nullable=*/false),
+ ::arrow::field("age", ::arrow::int32(),
/*nullable=*/false)});
+ auto root_type =
+ ::arrow::struct_({::arrow::field("id", ::arrow::int64(),
/*nullable=*/false),
+ ::arrow::field("person", person_type,
/*nullable=*/false)});
+
+ auto arrow_array = ::arrow::json::ArrayFromJSONString(root_type, R"([
+ {"id": 1, "person": {"name": "Alice", "age": 30}},
+ {"id": 2, "person": {"name": "Bob", "age": 25}}])")
+ .ValueOrDie();
+
+ ArrowSchema c_schema;
+ ArrowArray c_array;
+ internal::ArrowSchemaGuard schema_guard(&c_schema);
+ internal::ArrowArrayGuard array_guard(&c_array);
+ ASSERT_TRUE(::arrow::ExportType(*root_type, &c_schema).ok());
+ ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+ auto struct_like_result = ArrowArrayStructLike::Make(c_schema, c_array);
+ ASSERT_THAT(struct_like_result, IsOk());
+ auto struct_like = std::move(struct_like_result.value());
+
+ constexpr int64_t kNumRows = 2;
+ std::array<int64_t, kNumRows> ids = {1, 2};
+ std::array<std::string, kNumRows> names = {"Alice", "Bob"};
+ std::array<int32_t, kNumRows> ages = {30, 25};
+
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ ASSERT_THAT(struct_like->Reset(i), IsOk());
+ EXPECT_EQ(struct_like->num_fields(), 2);
+ EXPECT_SCALAR_EQ(struct_like->GetField(0), int64_t, ids[i]);
+
+ auto person_result = struct_like->GetField(1);
+ ASSERT_THAT(person_result, IsOk());
+ auto person_scalar = person_result.value();
+
ASSERT_TRUE(std::holds_alternative<std::shared_ptr<StructLike>>(person_scalar));
+
+ auto person_struct = std::get<std::shared_ptr<StructLike>>(person_scalar);
+ EXPECT_EQ(person_struct->num_fields(), 2);
+ EXPECT_SCALAR_EQ(person_struct->GetField(0), std::string_view, names[i]);
+ EXPECT_SCALAR_EQ(person_struct->GetField(1), int32_t, ages[i]);
+ }
+}
+
+TEST(ArrowArrayStructLike, PrimitiveList) {
+ auto list_type =
+ ::arrow::list(::arrow::field("item", ::arrow::int32(),
/*nullable=*/false));
+
+ auto arrow_array = ::arrow::json::ArrayFromJSONString(list_type, R"([
+ [1, 2, 3, 4, 5],
+ [10, 20],
+ []])")
+ .ValueOrDie();
+
+ ArrowSchema c_schema;
+ ArrowArray c_array;
+ internal::ArrowSchemaGuard schema_guard(&c_schema);
+ internal::ArrowArrayGuard array_guard(&c_array);
+ ASSERT_TRUE(::arrow::ExportType(*list_type, &c_schema).ok());
+ ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+ auto array_like_result = ArrowArrayArrayLike::Make(c_schema, c_array);
+ ASSERT_THAT(array_like_result, IsOk());
+ auto array_like = std::move(array_like_result.value());
+
+ constexpr int64_t kNumRows = 3;
+ std::array<std::vector<int32_t>, kNumRows> expected_lists = {
+ std::vector<int32_t>{1, 2, 3, 4, 5},
+ std::vector<int32_t>{10, 20},
+ std::vector<int32_t>{},
+ };
+
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ ASSERT_THAT(array_like->Reset(i), IsOk());
+ const auto& expected_list = expected_lists[i];
+ ASSERT_EQ(array_like->size(), expected_list.size());
+ for (size_t j = 0; j < expected_list.size(); ++j) {
+ EXPECT_SCALAR_EQ(array_like->GetElement(j), int32_t, expected_list[j]);
+ }
+ }
+}
+
+TEST(ArrowArrayStructLike, PrimitiveMap) {
+ auto map_type = std::make_shared<::arrow::MapType>(
+ ::arrow::field("key", ::arrow::utf8(), /*nullable=*/false),
+ ::arrow::field("value", ::arrow::int32(), /*nullable=*/false));
+
+ auto arrow_array = ::arrow::json::ArrayFromJSONString(map_type, R"([
+ [["Foo", 1], ["Bar", 2]],
+ [["Baz", 1]],
+ []])")
+ .ValueOrDie();
+
+ ArrowSchema c_schema;
+ ArrowArray c_array;
+ internal::ArrowSchemaGuard schema_guard(&c_schema);
+ internal::ArrowArrayGuard array_guard(&c_array);
+ ASSERT_TRUE(::arrow::ExportType(*map_type, &c_schema).ok());
+ ASSERT_TRUE(::arrow::ExportArray(*arrow_array, &c_array).ok());
+
+ auto map_like_result = ArrowArrayMapLike::Make(c_schema, c_array);
+ ASSERT_THAT(map_like_result, IsOk());
+ auto map_like = std::move(map_like_result.value());
+
+ constexpr int64_t kNumRows = 3;
+ std::array<std::vector<std::pair<std::string, int32_t>>, kNumRows>
expected_maps = {
+ std::vector<std::pair<std::string, int32_t>>{{"Foo", 1}, {"Bar", 2}},
+ std::vector<std::pair<std::string, int32_t>>{{"Baz", 1}},
+ std::vector<std::pair<std::string, int32_t>>{},
+ };
+
+ for (int64_t i = 0; i < kNumRows; ++i) {
+ ASSERT_THAT(map_like->Reset(i), IsOk());
+ const auto& expected_map = expected_maps[i];
+ ASSERT_EQ(map_like->size(), expected_map.size());
+ for (size_t j = 0; j < expected_map.size(); ++j) {
+ EXPECT_SCALAR_EQ(map_like->GetKey(j), std::string_view,
expected_map[j].first);
+ EXPECT_SCALAR_EQ(map_like->GetValue(j), int32_t, expected_map[j].second);
+ }
+ }
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 01d1a26..41061d3 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -138,6 +138,13 @@ class Reader;
class Writer;
class StructLike;
+class ArrayLike;
+class MapLike;
+
+///
----------------------------------------------------------------------------
+/// TODO: Forward declarations below are not added yet.
+///
----------------------------------------------------------------------------
+
class MetadataUpdate;
class UpdateRequirement;
class AppendFiles;