wgtmac commented on code in PR #657:
URL: https://github.com/apache/iceberg-cpp/pull/657#discussion_r3275149711


##########
src/iceberg/arrow_c_data_util.cc:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
+  return MakeArrowArrayStream<Reader>(std::move(reader));
+}
+
+namespace {
+
+Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t 
field_id) {
+  for (size_t index = 0; index < fields.size(); ++index) {
+    if (fields[index].field_id() == field_id) {
+      return index;
+    }
+  }
+  return InvalidArgument("Required schema does not contain projected field id 
{}",
+                         field_id);
+}
+
+std::mutex g_project_batch_function_mutex;
+ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr;
+
+ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() {
+  std::lock_guard lock(g_project_batch_function_mutex);
+  return g_project_batch_function;
+}
+
+Result<std::vector<int>> BuildSelectedFieldIndices(
+    std::span<const SchemaField> input_fields,
+    std::span<const SchemaField> output_fields) {
+  std::vector<int> selected_field_indices;
+  selected_field_indices.reserve(output_fields.size());
+
+  for (const auto& output_field : output_fields) {
+    ICEBERG_ASSIGN_OR_RAISE(auto input_index,
+                            FindFieldIndexById(input_fields, 
output_field.field_id()));
+    const auto& input_field = input_fields[input_index];
+    if (*input_field.type() != *output_field.type()) {
+      return InvalidArgument(
+          "ProjectBatch only supports complete top-level fields, but field id "
+          "{} changes type from {} to {}",
+          output_field.field_id(), input_field.type()->ToString(),
+          output_field.type()->ToString());
+    }
+    ICEBERG_PRECHECK(input_index <= 
static_cast<size_t>(std::numeric_limits<int>::max()),
+                     "Input field index {} exceeds int range", input_index);
+    selected_field_indices.push_back(static_cast<int>(input_index));
+  }
+
+  return selected_field_indices;
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array);
+
+Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                        const ArrowArrayView& input_view, int64_t row_index,
+                        ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t element_index = begin; element_index < end; ++element_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], element_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                       const ArrowArrayView& input_view, int64_t row_index,
+                       ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t entry_index = begin; entry_index < end; ++entry_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], entry_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& 
input_view,
+                     int64_t row_index, ArrowArray* output_array) {
+  ArrowError error;
+  ArrowSchemaView schema_view;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowSchemaViewInit(&schema_view, &input_schema, &error), error);
+
+  ArrowDecimal value;
+  ArrowDecimalInit(&value, schema_view.decimal_bitwidth, 
schema_view.decimal_precision,
+                   schema_view.decimal_scale);
+  ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, 
&value));
+  return {};
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array) {
+  if (ArrowArrayViewIsNull(&input_view, row_index)) {
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1));
+    return {};
+  }
+
+  switch (input_view.storage_type) {
+    case NANOARROW_TYPE_NA:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 
1));
+      return {};
+    case NANOARROW_TYPE_BOOL:
+    case NANOARROW_TYPE_INT8:
+    case NANOARROW_TYPE_INT16:
+    case NANOARROW_TYPE_INT32:
+    case NANOARROW_TYPE_INT64:
+    case NANOARROW_TYPE_DATE32:
+    case NANOARROW_TYPE_DATE64:
+    case NANOARROW_TYPE_TIME32:
+    case NANOARROW_TYPE_TIME64:
+    case NANOARROW_TYPE_TIMESTAMP:
+    case NANOARROW_TYPE_DURATION:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(
+          output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_UINT8:
+    case NANOARROW_TYPE_UINT16:
+    case NANOARROW_TYPE_UINT32:
+    case NANOARROW_TYPE_UINT64:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt(
+          output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_HALF_FLOAT:
+    case NANOARROW_TYPE_FLOAT:
+    case NANOARROW_TYPE_DOUBLE:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble(
+          output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, 
row_index)));
+      return {};
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_STRING_VIEW: {
+      auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_BINARY:
+    case NANOARROW_TYPE_LARGE_BINARY:
+    case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+    case NANOARROW_TYPE_BINARY_VIEW: {
+      auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_DECIMAL128:
+    case NANOARROW_TYPE_DECIMAL256:
+      return AppendDecimal(input_schema, input_view, row_index, output_array);
+    case NANOARROW_TYPE_STRUCT: {
+      for (int64_t child_index = 0; child_index < input_schema.n_children;
+           ++child_index) {
+        ICEBERG_RETURN_UNEXPECTED(AppendValue(
+            *input_schema.children[child_index], 
*input_array.children[child_index],
+            *input_view.children[child_index], row_index,
+            output_array->children[child_index]));
+      }
+      
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+      return {};
+    }
+    case NANOARROW_TYPE_LIST:
+    case NANOARROW_TYPE_LARGE_LIST:
+    case NANOARROW_TYPE_FIXED_SIZE_LIST:
+      return AppendListValues(input_schema, input_array, input_view, row_index,
+                              output_array);
+    case NANOARROW_TYPE_MAP:
+      return AppendMapValues(input_schema, input_array, input_view, row_index,
+                             output_array);
+    default:
+      return NotImplemented("Unsupported Arrow type for merge-on-read 
projection: {}",
+                            static_cast<int>(input_view.storage_type));
+  }
+}
+
+}  // namespace
+
+ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept
+    : input_schema_(std::exchange(other.input_schema_, nullptr)),
+      output_schema_(std::exchange(other.output_schema_, nullptr)),
+      selected_field_indices_(std::move(other.selected_field_indices_)),
+      input_arrow_schema_(other.input_arrow_schema_),
+      output_arrow_schema_(other.output_arrow_schema_),
+      project_batch_function_(std::exchange(other.project_batch_function_, 
nullptr)),
+      project_batch_state_(std::move(other.project_batch_state_)) {
+  other.input_arrow_schema_.release = nullptr;
+  other.output_arrow_schema_.release = nullptr;
+}
+
+ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) 
noexcept {
+  if (this == &other) {
+    return *this;
+  }
+
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+
+  input_schema_ = std::exchange(other.input_schema_, nullptr);
+  output_schema_ = std::exchange(other.output_schema_, nullptr);
+  selected_field_indices_ = std::move(other.selected_field_indices_);
+  input_arrow_schema_ = other.input_arrow_schema_;
+  other.input_arrow_schema_.release = nullptr;
+  output_arrow_schema_ = other.output_arrow_schema_;
+  other.output_arrow_schema_.release = nullptr;
+  project_batch_function_ = std::exchange(other.project_batch_function_, 
nullptr);
+  project_batch_state_ = std::move(other.project_batch_state_);
+  return *this;
+}
+
+ProjectionContext::~ProjectionContext() {
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+}
+
+Result<ProjectionContext> ProjectionContext::Make(
+    const Schema& input_schema, const Schema& output_schema,
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto selected_field_indices,
+      BuildSelectedFieldIndices(input_schema.fields(), 
output_schema.fields()));
+
+  ProjectionContext context;
+  context.input_schema_ = &input_schema;
+  context.output_schema_ = &output_schema;
+  context.selected_field_indices_ = std::move(selected_field_indices);
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, 
&context.input_arrow_schema_));
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, 
&context.output_arrow_schema_));
+  context.project_batch_function_ = project_batch_function;
+
+  return context;
+}
+
+const Schema& ProjectionContext::input_schema() const { return *input_schema_; 
}
+
+const Schema& ProjectionContext::output_schema() const { return 
*output_schema_; }
+
+const ArrowSchema& ProjectionContext::input_arrow_schema() const {
+  return input_arrow_schema_;
+}
+
+const ArrowSchema& ProjectionContext::output_arrow_schema() const {
+  return output_arrow_schema_;
+}
+
+std::span<const int> ProjectionContext::selected_field_indices() const {
+  return selected_field_indices_;
+}
+
+ProjectionContext::ProjectBatchFunction 
ProjectionContext::project_batch_function()
+    const {
+  return project_batch_function_;
+}
+
+ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() 
{
+  return project_batch_state_;
+}
+
+void ProjectionContext::RegisterProjectBatchFunction(
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_DCHECK(project_batch_function != nullptr,
+                 "ProjectBatch implementation must not be null");
+  if (project_batch_function == nullptr) {
+    return;
+  }
+  std::lock_guard lock(g_project_batch_function_mutex);
+  g_project_batch_function = project_batch_function;
+}
+
+bool ProjectionContext::HasProjectBatchFunction() {
+  return GetProjectBatchFunction() != nullptr;
+}
+
+auto ProjectionContext::ResolveProjectBatchFunction()
+    -> ProjectionContext::ProjectBatchFunction {
+  return GetProjectBatchFunction();
+}
+
+namespace {
+
+Result<ArrowArray> ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema,
+                                         const ArrowArray& input_batch,
+                                         std::span<const int32_t> row_indices,
+                                         const ArrowSchema& 
output_arrow_schema,
+                                         std::span<const int> 
selected_field_indices) {
+  ArrowArrayView input_view;
+  ArrowError error;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), 
error);
+  internal::ArrowArrayViewGuard input_view_guard(&input_view);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewSetArray(&input_view, &input_batch, &error), error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, 
&error),
+      error);
+
+  ArrowArray output_array;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayInitFromSchema(&output_array, &output_arrow_schema, &error), 
error);
+  internal::ArrowArrayGuard output_array_guard(&output_array);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&output_array));
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+      ArrowArrayReserve(&output_array, 
static_cast<int64_t>(row_indices.size())));
+
+  for (int64_t row_index : row_indices) {
+    ICEBERG_PRECHECK(row_index >= 0 && row_index < input_batch.length,
+                     "Row index {} out of range for batch length {}", 
row_index,
+                     input_batch.length);
+    for (size_t output_index = 0; output_index < selected_field_indices.size();
+         ++output_index) {
+      const int input_index = selected_field_indices[output_index];
+      
ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index],
+                                            *input_batch.children[input_index],
+                                            *input_view.children[input_index], 
row_index,
+                                            
output_array.children[output_index]));
+    }
+    
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&output_array));
+  }
+
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayFinishBuildingDefault(&output_array, &error), error);
+
+  return std::exchange(output_array, ArrowArray{});
+}
+
+}  // namespace
+
+Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
+                                std::span<const int32_t> row_indices,
+                                ProjectionContext& projection) {
+  ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+  internal::ArrowArrayGuard input_batch_guard(input_batch);
+

Review Comment:
   Added a comment explaining the Arrow import ownership transfer.



##########
src/iceberg/arrow_c_data_util.cc:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
+  return MakeArrowArrayStream<Reader>(std::move(reader));
+}
+
+namespace {
+
+Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t 
field_id) {
+  for (size_t index = 0; index < fields.size(); ++index) {
+    if (fields[index].field_id() == field_id) {
+      return index;
+    }
+  }
+  return InvalidArgument("Required schema does not contain projected field id 
{}",
+                         field_id);
+}
+
+std::mutex g_project_batch_function_mutex;
+ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr;
+
+ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() {
+  std::lock_guard lock(g_project_batch_function_mutex);
+  return g_project_batch_function;
+}
+
+Result<std::vector<int>> BuildSelectedFieldIndices(
+    std::span<const SchemaField> input_fields,
+    std::span<const SchemaField> output_fields) {
+  std::vector<int> selected_field_indices;
+  selected_field_indices.reserve(output_fields.size());
+
+  for (const auto& output_field : output_fields) {
+    ICEBERG_ASSIGN_OR_RAISE(auto input_index,
+                            FindFieldIndexById(input_fields, 
output_field.field_id()));
+    const auto& input_field = input_fields[input_index];
+    if (*input_field.type() != *output_field.type()) {
+      return InvalidArgument(
+          "ProjectBatch only supports complete top-level fields, but field id "
+          "{} changes type from {} to {}",
+          output_field.field_id(), input_field.type()->ToString(),
+          output_field.type()->ToString());
+    }
+    ICEBERG_PRECHECK(input_index <= 
static_cast<size_t>(std::numeric_limits<int>::max()),
+                     "Input field index {} exceeds int range", input_index);
+    selected_field_indices.push_back(static_cast<int>(input_index));
+  }
+
+  return selected_field_indices;
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array);
+
+Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                        const ArrowArrayView& input_view, int64_t row_index,
+                        ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t element_index = begin; element_index < end; ++element_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], element_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                       const ArrowArrayView& input_view, int64_t row_index,
+                       ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t entry_index = begin; entry_index < end; ++entry_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], entry_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& 
input_view,
+                     int64_t row_index, ArrowArray* output_array) {
+  ArrowError error;
+  ArrowSchemaView schema_view;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowSchemaViewInit(&schema_view, &input_schema, &error), error);
+
+  ArrowDecimal value;
+  ArrowDecimalInit(&value, schema_view.decimal_bitwidth, 
schema_view.decimal_precision,
+                   schema_view.decimal_scale);
+  ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, 
&value));
+  return {};
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array) {
+  if (ArrowArrayViewIsNull(&input_view, row_index)) {
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1));
+    return {};
+  }
+
+  switch (input_view.storage_type) {
+    case NANOARROW_TYPE_NA:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 
1));
+      return {};
+    case NANOARROW_TYPE_BOOL:
+    case NANOARROW_TYPE_INT8:
+    case NANOARROW_TYPE_INT16:
+    case NANOARROW_TYPE_INT32:
+    case NANOARROW_TYPE_INT64:
+    case NANOARROW_TYPE_DATE32:
+    case NANOARROW_TYPE_DATE64:
+    case NANOARROW_TYPE_TIME32:
+    case NANOARROW_TYPE_TIME64:
+    case NANOARROW_TYPE_TIMESTAMP:
+    case NANOARROW_TYPE_DURATION:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(
+          output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_UINT8:
+    case NANOARROW_TYPE_UINT16:
+    case NANOARROW_TYPE_UINT32:
+    case NANOARROW_TYPE_UINT64:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt(
+          output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_HALF_FLOAT:
+    case NANOARROW_TYPE_FLOAT:
+    case NANOARROW_TYPE_DOUBLE:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble(
+          output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, 
row_index)));
+      return {};
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_STRING_VIEW: {
+      auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_BINARY:
+    case NANOARROW_TYPE_LARGE_BINARY:
+    case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+    case NANOARROW_TYPE_BINARY_VIEW: {
+      auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_DECIMAL128:
+    case NANOARROW_TYPE_DECIMAL256:
+      return AppendDecimal(input_schema, input_view, row_index, output_array);
+    case NANOARROW_TYPE_STRUCT: {
+      for (int64_t child_index = 0; child_index < input_schema.n_children;
+           ++child_index) {
+        ICEBERG_RETURN_UNEXPECTED(AppendValue(
+            *input_schema.children[child_index], 
*input_array.children[child_index],
+            *input_view.children[child_index], row_index,
+            output_array->children[child_index]));
+      }
+      
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+      return {};
+    }
+    case NANOARROW_TYPE_LIST:
+    case NANOARROW_TYPE_LARGE_LIST:
+    case NANOARROW_TYPE_FIXED_SIZE_LIST:
+      return AppendListValues(input_schema, input_array, input_view, row_index,
+                              output_array);
+    case NANOARROW_TYPE_MAP:
+      return AppendMapValues(input_schema, input_array, input_view, row_index,
+                             output_array);
+    default:
+      return NotImplemented("Unsupported Arrow type for merge-on-read 
projection: {}",
+                            static_cast<int>(input_view.storage_type));
+  }
+}
+
+}  // namespace
+
+ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept
+    : input_schema_(std::exchange(other.input_schema_, nullptr)),
+      output_schema_(std::exchange(other.output_schema_, nullptr)),
+      selected_field_indices_(std::move(other.selected_field_indices_)),
+      input_arrow_schema_(other.input_arrow_schema_),
+      output_arrow_schema_(other.output_arrow_schema_),
+      project_batch_function_(std::exchange(other.project_batch_function_, 
nullptr)),
+      project_batch_state_(std::move(other.project_batch_state_)) {
+  other.input_arrow_schema_.release = nullptr;
+  other.output_arrow_schema_.release = nullptr;
+}
+
+ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) 
noexcept {
+  if (this == &other) {
+    return *this;
+  }
+
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+
+  input_schema_ = std::exchange(other.input_schema_, nullptr);
+  output_schema_ = std::exchange(other.output_schema_, nullptr);
+  selected_field_indices_ = std::move(other.selected_field_indices_);
+  input_arrow_schema_ = other.input_arrow_schema_;
+  other.input_arrow_schema_.release = nullptr;
+  output_arrow_schema_ = other.output_arrow_schema_;
+  other.output_arrow_schema_.release = nullptr;
+  project_batch_function_ = std::exchange(other.project_batch_function_, 
nullptr);
+  project_batch_state_ = std::move(other.project_batch_state_);
+  return *this;
+}
+
+ProjectionContext::~ProjectionContext() {
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+}
+
+Result<ProjectionContext> ProjectionContext::Make(
+    const Schema& input_schema, const Schema& output_schema,
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto selected_field_indices,
+      BuildSelectedFieldIndices(input_schema.fields(), 
output_schema.fields()));
+
+  ProjectionContext context;
+  context.input_schema_ = &input_schema;
+  context.output_schema_ = &output_schema;
+  context.selected_field_indices_ = std::move(selected_field_indices);
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, 
&context.input_arrow_schema_));
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, 
&context.output_arrow_schema_));
+  context.project_batch_function_ = project_batch_function;
+
+  return context;
+}
+
+const Schema& ProjectionContext::input_schema() const { return *input_schema_; 
}
+
+const Schema& ProjectionContext::output_schema() const { return 
*output_schema_; }
+
+const ArrowSchema& ProjectionContext::input_arrow_schema() const {
+  return input_arrow_schema_;
+}
+
+const ArrowSchema& ProjectionContext::output_arrow_schema() const {
+  return output_arrow_schema_;
+}
+
+std::span<const int> ProjectionContext::selected_field_indices() const {
+  return selected_field_indices_;
+}
+
+ProjectionContext::ProjectBatchFunction 
ProjectionContext::project_batch_function()
+    const {
+  return project_batch_function_;
+}
+
+ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() 
{
+  return project_batch_state_;
+}
+
+void ProjectionContext::RegisterProjectBatchFunction(
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_DCHECK(project_batch_function != nullptr,
+                 "ProjectBatch implementation must not be null");
+  if (project_batch_function == nullptr) {
+    return;
+  }
+  std::lock_guard lock(g_project_batch_function_mutex);
+  g_project_batch_function = project_batch_function;
+}
+
+bool ProjectionContext::HasProjectBatchFunction() {
+  return GetProjectBatchFunction() != nullptr;
+}
+
+auto ProjectionContext::ResolveProjectBatchFunction()
+    -> ProjectionContext::ProjectBatchFunction {
+  return GetProjectBatchFunction();
+}
+
+namespace {
+
+Result<ArrowArray> ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema,
+                                         const ArrowArray& input_batch,
+                                         std::span<const int32_t> row_indices,
+                                         const ArrowSchema& 
output_arrow_schema,
+                                         std::span<const int> 
selected_field_indices) {
+  ArrowArrayView input_view;
+  ArrowError error;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), 
error);
+  internal::ArrowArrayViewGuard input_view_guard(&input_view);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewSetArray(&input_view, &input_batch, &error), error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, 
&error),
+      error);

Review Comment:
   Switched the nanoarrow path to DEFAULT validation.



##########
src/iceberg/arrow/arrow_c_data_util.cc:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <arrow/array/array_primitive.h>
+#include <arrow/buffer.h>
+#include <arrow/c/bridge.h>
+#include <arrow/compute/api_vector.h>
+#include <arrow/record_batch.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ArrowProjectBatchState {
+  std::shared_ptr<::arrow::Schema> input_schema;
+  std::shared_ptr<::arrow::Schema> output_schema;
+};
+
+Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
+    const ArrowSchema& arrow_schema) {
+  ArrowSchema schema_copy;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, 
&schema_copy));
+  internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, 
::arrow::ImportSchema(&schema_copy));
+  return schema;
+}
+
+Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
+    ProjectionContext& projection) {
+  auto state =
+      
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
+  if (state != nullptr) {
+    return state;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
+                          ImportArrowSchema(projection.input_arrow_schema()));
+  ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
+                          ImportArrowSchema(projection.output_arrow_schema()));
+
+  state = std::make_shared<ArrowProjectBatchState>(
+      ArrowProjectBatchState{.input_schema = std::move(input_schema),
+                             .output_schema = std::move(output_schema)});
+  projection.project_batch_state() = state;
+  return state;
+}
+
+Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
+                                            std::span<const int32_t> 
row_indices,
+                                            ProjectionContext& projection) {
+  ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+  ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto input_record_batch,
+      ::arrow::ImportRecordBatch(input_batch, state->input_schema));
+
+  const int32_t empty_index = 0;
+  const int32_t* row_indices_data =

Review Comment:
   Added the Buffer::Wrap empty-input comment.



##########
src/iceberg/data/file_scan_task_reader.cc:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/file_scan_task_reader.h"
+
+#include <algorithm>
+#include <memory>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/data/delete_filter.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+ReaderOptions MakeReaderOptions(const DataFile& data_file, 
std::shared_ptr<FileIO> io,
+                                std::shared_ptr<Schema> projection,
+                                std::shared_ptr<Expression> filter,
+                                std::shared_ptr<NameMapping> name_mapping,
+                                ReaderProperties properties) {
+  return ReaderOptions{
+      .path = data_file.file_path,
+      .length = static_cast<size_t>(data_file.file_size_in_bytes),
+      .io = std::move(io),
+      .projection = std::move(projection),
+      .filter = std::move(filter),
+      .name_mapping = std::move(name_mapping),
+      .properties = std::move(properties),
+  };
+}
+
+class MergeOnReadStreamSource {
+ public:
+  MergeOnReadStreamSource(std::unique_ptr<Reader> reader,
+                          std::unique_ptr<DeleteFilter> delete_filter,
+                          std::shared_ptr<::iceberg::Schema> required_schema,
+                          std::shared_ptr<::iceberg::Schema> projected_schema,
+                          ProjectionContext projection_context)
+      : reader_(std::move(reader)),
+        delete_filter_(std::move(delete_filter)),
+        required_schema_(std::move(required_schema)),
+        projected_schema_(std::move(projected_schema)),
+        project_all_rows_(required_schema_->SameSchema(*projected_schema_)),
+        projection_context_(std::move(projection_context)) {}
+
+  ~MergeOnReadStreamSource() {
+    if (cached_schema_.has_value() && cached_schema_->release != nullptr) {
+      cached_schema_->release(&cached_schema_.value());
+    }
+  }
+
+  Status Close() {
+    if (reader_ == nullptr) {
+      return {};
+    }
+    return reader_->Close();
+  }
+
+  Result<std::optional<ArrowArray>> Next() {
+    if (!cached_schema_.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema());
+    }
+    ArrowSchema& input_arrow_schema = cached_schema_.value();

Review Comment:
   Added a comment for the stable stream schema assumption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to