WZhuo commented on code in PR #657: URL: https://github.com/apache/iceberg-cpp/pull/657#discussion_r3265468327
########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> Review Comment: nit: `<tuple>` does not appear to be used in this header. Can be removed. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: + using ProjectBatchState = std::shared_ptr<void>; + + using ProjectBatchFunction = auto (*)(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) + -> Result<ArrowArray>; + + /// \brief Register a custom implementation for ProjectBatch. + static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); + + /// \brief Returns true when a custom implementation has been registered. + static bool HasProjectBatchFunction(); + + /// \brief Resolve the registered ProjectBatch implementation. + static auto ResolveProjectBatchFunction() -> ProjectBatchFunction; + + /// \brief Build reusable projection state for a validated schema pair. + /// + /// \param input_schema Schema that describes every input batch. + /// \param output_schema Final schema and column order requested by the caller. + /// \param project_batch_function Optional implementation returned by + /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow + /// path. + /// \note It validates that output_schema selects or reorders complete top-level fields + /// by field id. Nested pruning and type changes are rejected. The input_schema and + /// output_schema passed to Make must outlive the context. ProjectBatch may lazily + /// initialize backend cache; do not share one context across concurrent calls. + static Result<ProjectionContext> Make(const Schema& input_schema, + const Schema& output_schema, + ProjectBatchFunction project_batch_function); + + ProjectionContext(ProjectionContext&&) noexcept; + ProjectionContext& operator=(ProjectionContext&&) noexcept; + ~ProjectionContext(); + + ProjectionContext(const ProjectionContext&) = delete; + ProjectionContext& operator=(const ProjectionContext&) = delete; + + const Schema& input_schema() const; + + const Schema& output_schema() const; + + const ArrowSchema& input_arrow_schema() const; + + const ArrowSchema& output_arrow_schema() const; + + std::span<const int> selected_field_indices() const; + + ProjectBatchFunction project_batch_function() const; + + ProjectBatchState& project_batch_state(); + + private: + ProjectionContext() = default; + + const Schema* input_schema_ = nullptr; + const Schema* output_schema_ = nullptr; + std::vector<int> selected_field_indices_; + ArrowSchema input_arrow_schema_{}; Review Comment: nit: `selected_field_indices_` is `vector<int>` but `row_indices` in `ProjectBatch` is `span<const int32_t>`. Using `int32_t` consistently for both would avoid implicit conversions in the Arrow compute path. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: + using ProjectBatchState = std::shared_ptr<void>; + + using ProjectBatchFunction = auto (*)(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) + -> Result<ArrowArray>; + + /// \brief Register a custom implementation for ProjectBatch. + static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); + + /// \brief Returns true when a custom implementation has been registered. + static bool HasProjectBatchFunction(); + + /// \brief Resolve the registered ProjectBatch implementation. + static auto ResolveProjectBatchFunction() -> ProjectBatchFunction; + + /// \brief Build reusable projection state for a validated schema pair. + /// + /// \param input_schema Schema that describes every input batch. + /// \param output_schema Final schema and column order requested by the caller. + /// \param project_batch_function Optional implementation returned by + /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow + /// path. + /// \note It validates that output_schema selects or reorders complete top-level fields + /// by field id. Nested pruning and type changes are rejected. The input_schema and + /// output_schema passed to Make must outlive the context. ProjectBatch may lazily + /// initialize backend cache; do not share one context across concurrent calls. + static Result<ProjectionContext> Make(const Schema& input_schema, + const Schema& output_schema, + ProjectBatchFunction project_batch_function); + + ProjectionContext(ProjectionContext&&) noexcept; + ProjectionContext& operator=(ProjectionContext&&) noexcept; + ~ProjectionContext(); + + ProjectionContext(const ProjectionContext&) = delete; + ProjectionContext& operator=(const ProjectionContext&) = delete; + + const Schema& input_schema() const; + + const Schema& output_schema() const; + + const ArrowSchema& input_arrow_schema() const; + + const ArrowSchema& output_arrow_schema() const; + + std::span<const int> selected_field_indices() const; + + ProjectBatchFunction project_batch_function() const; + + ProjectBatchState& project_batch_state(); + + private: + ProjectionContext() = default; + + const Schema* input_schema_ = nullptr; + const Schema* output_schema_ = nullptr; Review Comment: nit: Storing raw pointers here with the requirement that callers ensure the schemas outlive the context is fragile. A future refactor could easily break this invariant. Consider either storing `shared_ptr<const Schema>` or at minimum adding a prominent comment at each call site (e.g. in `FileScanTaskReader::Impl::Open`) explaining why the lifetime is guaranteed. ########## src/iceberg/arrow/arrow_c_data_util.cc: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdint> +#include <memory> +#include <mutex> +#include <span> +#include <utility> +#include <vector> + +#include <arrow/array/array_primitive.h> +#include <arrow/buffer.h> +#include <arrow/c/bridge.h> +#include <arrow/compute/api_vector.h> +#include <arrow/record_batch.h> +#include <nanoarrow/nanoarrow.h> + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +struct ArrowProjectBatchState { + std::shared_ptr<::arrow::Schema> input_schema; + std::shared_ptr<::arrow::Schema> output_schema; +}; + +Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema( + const ArrowSchema& arrow_schema) { + ArrowSchema schema_copy; + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, &schema_copy)); + internal::ArrowSchemaGuard schema_copy_guard(&schema_copy); + + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, ::arrow::ImportSchema(&schema_copy)); + return schema; +} + +Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState( + ProjectionContext& projection) { + auto state = + std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state()); + if (state != nullptr) { + return state; + } + + ICEBERG_ASSIGN_OR_RAISE(auto input_schema, + ImportArrowSchema(projection.input_arrow_schema())); + ICEBERG_ASSIGN_OR_RAISE(auto output_schema, + ImportArrowSchema(projection.output_arrow_schema())); + + state = std::make_shared<ArrowProjectBatchState>( + ArrowProjectBatchState{.input_schema = std::move(input_schema), + .output_schema = std::move(output_schema)}); + projection.project_batch_state() = state; + return state; +} + +Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) { + ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null"); + ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection)); + + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto input_record_batch, + ::arrow::ImportRecordBatch(input_batch, state->input_schema)); + + const int32_t empty_index = 0; + const int32_t* row_indices_data = Review Comment: nit: `empty_index` is a bit misleading — it is just a non-null pointer to satisfy `Buffer::Wrap` when `row_indices` is empty (the value `0` is never actually used). A brief comment would clarify intent, e.g.: ```cpp // Provide a valid pointer for Buffer::Wrap; the array length is 0 so this is never read. ``` ########## src/iceberg/arrow_c_data_util.cc: ########## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdint> +#include <limits> +#include <memory> +#include <mutex> +#include <span> +#include <utility> +#include <vector> + +#include <nanoarrow/nanoarrow.h> + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) { + return MakeArrowArrayStream<Reader>(std::move(reader)); +} + +namespace { + +Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t field_id) { + for (size_t index = 0; index < fields.size(); ++index) { + if (fields[index].field_id() == field_id) { + return index; + } + } + return InvalidArgument("Required schema does not contain projected field id {}", + field_id); +} + +std::mutex g_project_batch_function_mutex; +ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr; + +ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { + std::lock_guard lock(g_project_batch_function_mutex); + return g_project_batch_function; +} + +Result<std::vector<int>> BuildSelectedFieldIndices( + std::span<const SchemaField> input_fields, + std::span<const SchemaField> output_fields) { + std::vector<int> selected_field_indices; + selected_field_indices.reserve(output_fields.size()); + + for (const auto& output_field : output_fields) { + ICEBERG_ASSIGN_OR_RAISE(auto input_index, + FindFieldIndexById(input_fields, output_field.field_id())); + const auto& input_field = input_fields[input_index]; + if (*input_field.type() != *output_field.type()) { + return InvalidArgument( + "ProjectBatch only supports complete top-level fields, but field id " + "{} changes type from {} to {}", + output_field.field_id(), input_field.type()->ToString(), + output_field.type()->ToString()); + } + ICEBERG_PRECHECK(input_index <= static_cast<size_t>(std::numeric_limits<int>::max()), + "Input field index {} exceeds int range", input_index); + selected_field_indices.push_back(static_cast<int>(input_index)); + } + + return selected_field_indices; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array); + +Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t element_index = begin; element_index < end; ++element_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], element_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t entry_index = begin; entry_index < end; ++entry_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], entry_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& input_view, + int64_t row_index, ArrowArray* output_array) { + ArrowError error; + ArrowSchemaView schema_view; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowSchemaViewInit(&schema_view, &input_schema, &error), error); + + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, schema_view.decimal_precision, + schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, &value)); + return {}; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + if (ArrowArrayViewIsNull(&input_view, row_index)) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + } + + switch (input_view.storage_type) { + case NANOARROW_TYPE_NA: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + case NANOARROW_TYPE_BOOL: + case NANOARROW_TYPE_INT8: + case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_DATE32: + case NANOARROW_TYPE_DATE64: + case NANOARROW_TYPE_TIME32: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + case NANOARROW_TYPE_DURATION: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt( + output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_UINT64: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt( + output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_HALF_FLOAT: + case NANOARROW_TYPE_FLOAT: + case NANOARROW_TYPE_DOUBLE: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble( + output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_STRING_VIEW: { + auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, value)); + return {}; + } + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_BINARY_VIEW: { + auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, value)); + return {}; + } + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + return AppendDecimal(input_schema, input_view, row_index, output_array); + case NANOARROW_TYPE_STRUCT: { + for (int64_t child_index = 0; child_index < input_schema.n_children; + ++child_index) { + ICEBERG_RETURN_UNEXPECTED(AppendValue( + *input_schema.children[child_index], *input_array.children[child_index], + *input_view.children[child_index], row_index, + output_array->children[child_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; + } + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: + return AppendListValues(input_schema, input_array, input_view, row_index, + output_array); + case NANOARROW_TYPE_MAP: + return AppendMapValues(input_schema, input_array, input_view, row_index, + output_array); + default: + return NotImplemented("Unsupported Arrow type for merge-on-read projection: {}", + static_cast<int>(input_view.storage_type)); + } +} + +} // namespace + +ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept + : input_schema_(std::exchange(other.input_schema_, nullptr)), + output_schema_(std::exchange(other.output_schema_, nullptr)), + selected_field_indices_(std::move(other.selected_field_indices_)), + input_arrow_schema_(other.input_arrow_schema_), + output_arrow_schema_(other.output_arrow_schema_), + project_batch_function_(std::exchange(other.project_batch_function_, nullptr)), + project_batch_state_(std::move(other.project_batch_state_)) { + other.input_arrow_schema_.release = nullptr; + other.output_arrow_schema_.release = nullptr; +} + +ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) noexcept { + if (this == &other) { + return *this; + } + + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } + + input_schema_ = std::exchange(other.input_schema_, nullptr); + output_schema_ = std::exchange(other.output_schema_, nullptr); + selected_field_indices_ = std::move(other.selected_field_indices_); + input_arrow_schema_ = other.input_arrow_schema_; + other.input_arrow_schema_.release = nullptr; + output_arrow_schema_ = other.output_arrow_schema_; + other.output_arrow_schema_.release = nullptr; + project_batch_function_ = std::exchange(other.project_batch_function_, nullptr); + project_batch_state_ = std::move(other.project_batch_state_); + return *this; +} + +ProjectionContext::~ProjectionContext() { + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } +} + +Result<ProjectionContext> ProjectionContext::Make( + const Schema& input_schema, const Schema& output_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_indices, + BuildSelectedFieldIndices(input_schema.fields(), output_schema.fields())); + + ProjectionContext context; + context.input_schema_ = &input_schema; + context.output_schema_ = &output_schema; + context.selected_field_indices_ = std::move(selected_field_indices); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, &context.input_arrow_schema_)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, &context.output_arrow_schema_)); + context.project_batch_function_ = project_batch_function; + + return context; +} + +const Schema& ProjectionContext::input_schema() const { return *input_schema_; } + +const Schema& ProjectionContext::output_schema() const { return *output_schema_; } + +const ArrowSchema& ProjectionContext::input_arrow_schema() const { + return input_arrow_schema_; +} + +const ArrowSchema& ProjectionContext::output_arrow_schema() const { + return output_arrow_schema_; +} + +std::span<const int> ProjectionContext::selected_field_indices() const { + return selected_field_indices_; +} + +ProjectionContext::ProjectBatchFunction ProjectionContext::project_batch_function() + const { + return project_batch_function_; +} + +ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() { + return project_batch_state_; +} + +void ProjectionContext::RegisterProjectBatchFunction( + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_DCHECK(project_batch_function != nullptr, + "ProjectBatch implementation must not be null"); + if (project_batch_function == nullptr) { + return; + } + std::lock_guard lock(g_project_batch_function_mutex); + g_project_batch_function = project_batch_function; +} + +bool ProjectionContext::HasProjectBatchFunction() { + return GetProjectBatchFunction() != nullptr; +} + +auto ProjectionContext::ResolveProjectBatchFunction() + -> ProjectionContext::ProjectBatchFunction { + return GetProjectBatchFunction(); +} + +namespace { + +Result<ArrowArray> ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, + const ArrowArray& input_batch, + std::span<const int32_t> row_indices, + const ArrowSchema& output_arrow_schema, + std::span<const int> selected_field_indices) { + ArrowArrayView input_view; + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), error); + internal::ArrowArrayViewGuard input_view_guard(&input_view); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewSetArray(&input_view, &input_batch, &error), error); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + error); Review Comment: `NANOARROW_VALIDATION_LEVEL_FULL` on every batch is expensive for production workloads. Consider making this configurable (e.g. via a debug flag or build option), or using `NANOARROW_VALIDATION_LEVEL_DEFAULT` by default and only enabling full validation in debug/test builds. ########## src/iceberg/data/file_scan_task_reader.cc: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/file_scan_task_reader.h" + +#include <algorithm> +#include <memory> +#include <optional> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/data/delete_filter.h" +#include "iceberg/file_reader.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/table_scan.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +ReaderOptions MakeReaderOptions(const DataFile& data_file, std::shared_ptr<FileIO> io, + std::shared_ptr<Schema> projection, + std::shared_ptr<Expression> filter, + std::shared_ptr<NameMapping> name_mapping, + ReaderProperties properties) { + return ReaderOptions{ + .path = data_file.file_path, + .length = static_cast<size_t>(data_file.file_size_in_bytes), + .io = std::move(io), + .projection = std::move(projection), + .filter = std::move(filter), + .name_mapping = std::move(name_mapping), + .properties = std::move(properties), + }; +} + +class MergeOnReadStreamSource { + public: + MergeOnReadStreamSource(std::unique_ptr<Reader> reader, + std::unique_ptr<DeleteFilter> delete_filter, + std::shared_ptr<::iceberg::Schema> required_schema, + std::shared_ptr<::iceberg::Schema> projected_schema, + ProjectionContext projection_context) + : reader_(std::move(reader)), + delete_filter_(std::move(delete_filter)), + required_schema_(std::move(required_schema)), + projected_schema_(std::move(projected_schema)), + project_all_rows_(required_schema_->SameSchema(*projected_schema_)), + projection_context_(std::move(projection_context)) {} + + ~MergeOnReadStreamSource() { + if (cached_schema_.has_value() && cached_schema_->release != nullptr) { + cached_schema_->release(&cached_schema_.value()); + } + } + + Status Close() { + if (reader_ == nullptr) { + return {}; + } + return reader_->Close(); + } + + Result<std::optional<ArrowArray>> Next() { + if (!cached_schema_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema()); + } + ArrowSchema& input_arrow_schema = cached_schema_.value(); Review Comment: The schema returned by `reader_->Schema()` is cached on first call and reused for all subsequent batches. This is correct for Parquet (schema is fixed), but if a future format reader could return varying schemas across batches, this would break. A brief comment noting the assumption would be helpful. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: Review Comment: nit: This is an internal header (suffix `_internal.h`) but uses `ICEBERG_EXPORT`. This is needed because the symbols are used across TUs within the library, but it is a bit surprising. A brief comment explaining why would help future readers. ########## src/iceberg/arrow_c_data_util.cc: ########## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdint> +#include <limits> +#include <memory> +#include <mutex> +#include <span> +#include <utility> +#include <vector> + +#include <nanoarrow/nanoarrow.h> + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) { + return MakeArrowArrayStream<Reader>(std::move(reader)); +} + +namespace { + +Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t field_id) { + for (size_t index = 0; index < fields.size(); ++index) { + if (fields[index].field_id() == field_id) { + return index; + } + } + return InvalidArgument("Required schema does not contain projected field id {}", + field_id); +} + +std::mutex g_project_batch_function_mutex; +ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr; + +ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { + std::lock_guard lock(g_project_batch_function_mutex); + return g_project_batch_function; +} + +Result<std::vector<int>> BuildSelectedFieldIndices( + std::span<const SchemaField> input_fields, + std::span<const SchemaField> output_fields) { + std::vector<int> selected_field_indices; + selected_field_indices.reserve(output_fields.size()); + + for (const auto& output_field : output_fields) { + ICEBERG_ASSIGN_OR_RAISE(auto input_index, + FindFieldIndexById(input_fields, output_field.field_id())); + const auto& input_field = input_fields[input_index]; + if (*input_field.type() != *output_field.type()) { + return InvalidArgument( + "ProjectBatch only supports complete top-level fields, but field id " + "{} changes type from {} to {}", + output_field.field_id(), input_field.type()->ToString(), + output_field.type()->ToString()); + } + ICEBERG_PRECHECK(input_index <= static_cast<size_t>(std::numeric_limits<int>::max()), + "Input field index {} exceeds int range", input_index); + selected_field_indices.push_back(static_cast<int>(input_index)); + } + + return selected_field_indices; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array); + +Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t element_index = begin; element_index < end; ++element_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], element_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t entry_index = begin; entry_index < end; ++entry_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], entry_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& input_view, + int64_t row_index, ArrowArray* output_array) { + ArrowError error; + ArrowSchemaView schema_view; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowSchemaViewInit(&schema_view, &input_schema, &error), error); + + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, schema_view.decimal_precision, + schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, &value)); + return {}; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + if (ArrowArrayViewIsNull(&input_view, row_index)) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + } + + switch (input_view.storage_type) { + case NANOARROW_TYPE_NA: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + case NANOARROW_TYPE_BOOL: + case NANOARROW_TYPE_INT8: + case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_DATE32: + case NANOARROW_TYPE_DATE64: + case NANOARROW_TYPE_TIME32: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + case NANOARROW_TYPE_DURATION: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt( + output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_UINT64: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt( + output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_HALF_FLOAT: + case NANOARROW_TYPE_FLOAT: + case NANOARROW_TYPE_DOUBLE: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble( + output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_STRING_VIEW: { + auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, value)); + return {}; + } + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_BINARY_VIEW: { + auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, value)); + return {}; + } + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + return AppendDecimal(input_schema, input_view, row_index, output_array); + case NANOARROW_TYPE_STRUCT: { + for (int64_t child_index = 0; child_index < input_schema.n_children; + ++child_index) { + ICEBERG_RETURN_UNEXPECTED(AppendValue( + *input_schema.children[child_index], *input_array.children[child_index], + *input_view.children[child_index], row_index, + output_array->children[child_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; + } + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: + return AppendListValues(input_schema, input_array, input_view, row_index, + output_array); + case NANOARROW_TYPE_MAP: + return AppendMapValues(input_schema, input_array, input_view, row_index, + output_array); + default: + return NotImplemented("Unsupported Arrow type for merge-on-read projection: {}", + static_cast<int>(input_view.storage_type)); + } +} + +} // namespace + +ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept + : input_schema_(std::exchange(other.input_schema_, nullptr)), + output_schema_(std::exchange(other.output_schema_, nullptr)), + selected_field_indices_(std::move(other.selected_field_indices_)), + input_arrow_schema_(other.input_arrow_schema_), + output_arrow_schema_(other.output_arrow_schema_), + project_batch_function_(std::exchange(other.project_batch_function_, nullptr)), + project_batch_state_(std::move(other.project_batch_state_)) { + other.input_arrow_schema_.release = nullptr; + other.output_arrow_schema_.release = nullptr; +} + +ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) noexcept { + if (this == &other) { + return *this; + } + + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } + + input_schema_ = std::exchange(other.input_schema_, nullptr); + output_schema_ = std::exchange(other.output_schema_, nullptr); + selected_field_indices_ = std::move(other.selected_field_indices_); + input_arrow_schema_ = other.input_arrow_schema_; + other.input_arrow_schema_.release = nullptr; + output_arrow_schema_ = other.output_arrow_schema_; + other.output_arrow_schema_.release = nullptr; + project_batch_function_ = std::exchange(other.project_batch_function_, nullptr); + project_batch_state_ = std::move(other.project_batch_state_); + return *this; +} + +ProjectionContext::~ProjectionContext() { + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } +} + +Result<ProjectionContext> ProjectionContext::Make( + const Schema& input_schema, const Schema& output_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_indices, + BuildSelectedFieldIndices(input_schema.fields(), output_schema.fields())); + + ProjectionContext context; + context.input_schema_ = &input_schema; + context.output_schema_ = &output_schema; + context.selected_field_indices_ = std::move(selected_field_indices); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, &context.input_arrow_schema_)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, &context.output_arrow_schema_)); + context.project_batch_function_ = project_batch_function; + + return context; +} + +const Schema& ProjectionContext::input_schema() const { return *input_schema_; } + +const Schema& ProjectionContext::output_schema() const { return *output_schema_; } + +const ArrowSchema& ProjectionContext::input_arrow_schema() const { + return input_arrow_schema_; +} + +const ArrowSchema& ProjectionContext::output_arrow_schema() const { + return output_arrow_schema_; +} + +std::span<const int> ProjectionContext::selected_field_indices() const { + return selected_field_indices_; +} + +ProjectionContext::ProjectBatchFunction ProjectionContext::project_batch_function() + const { + return project_batch_function_; +} + +ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() { + return project_batch_state_; +} + +void ProjectionContext::RegisterProjectBatchFunction( + ProjectionContext::ProjectBatchFunction project_batch_function) { Review Comment: The global function pointer with mutex is a process-wide singleton — if two libraries in the same process register different implementations, the last one wins silently. Worth documenting this "last writer wins" behavior, either here or in the header doc for `RegisterProjectBatchFunction`. ########## src/iceberg/arrow_c_data_util.cc: ########## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdint> +#include <limits> +#include <memory> +#include <mutex> +#include <span> +#include <utility> +#include <vector> + +#include <nanoarrow/nanoarrow.h> + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) { + return MakeArrowArrayStream<Reader>(std::move(reader)); +} + +namespace { + +Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t field_id) { + for (size_t index = 0; index < fields.size(); ++index) { + if (fields[index].field_id() == field_id) { + return index; + } + } + return InvalidArgument("Required schema does not contain projected field id {}", + field_id); +} + +std::mutex g_project_batch_function_mutex; +ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr; + +ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { + std::lock_guard lock(g_project_batch_function_mutex); + return g_project_batch_function; +} + +Result<std::vector<int>> BuildSelectedFieldIndices( + std::span<const SchemaField> input_fields, + std::span<const SchemaField> output_fields) { + std::vector<int> selected_field_indices; + selected_field_indices.reserve(output_fields.size()); + + for (const auto& output_field : output_fields) { + ICEBERG_ASSIGN_OR_RAISE(auto input_index, + FindFieldIndexById(input_fields, output_field.field_id())); + const auto& input_field = input_fields[input_index]; + if (*input_field.type() != *output_field.type()) { + return InvalidArgument( + "ProjectBatch only supports complete top-level fields, but field id " + "{} changes type from {} to {}", + output_field.field_id(), input_field.type()->ToString(), + output_field.type()->ToString()); + } + ICEBERG_PRECHECK(input_index <= static_cast<size_t>(std::numeric_limits<int>::max()), + "Input field index {} exceeds int range", input_index); + selected_field_indices.push_back(static_cast<int>(input_index)); + } + + return selected_field_indices; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array); + +Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t element_index = begin; element_index < end; ++element_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], element_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t entry_index = begin; entry_index < end; ++entry_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], entry_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& input_view, + int64_t row_index, ArrowArray* output_array) { + ArrowError error; + ArrowSchemaView schema_view; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowSchemaViewInit(&schema_view, &input_schema, &error), error); + + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, schema_view.decimal_precision, + schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, &value)); + return {}; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + if (ArrowArrayViewIsNull(&input_view, row_index)) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + } + + switch (input_view.storage_type) { + case NANOARROW_TYPE_NA: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + case NANOARROW_TYPE_BOOL: + case NANOARROW_TYPE_INT8: + case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_DATE32: + case NANOARROW_TYPE_DATE64: + case NANOARROW_TYPE_TIME32: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + case NANOARROW_TYPE_DURATION: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt( + output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_UINT64: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt( + output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_HALF_FLOAT: + case NANOARROW_TYPE_FLOAT: + case NANOARROW_TYPE_DOUBLE: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble( + output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_STRING_VIEW: { + auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, value)); + return {}; + } + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_BINARY_VIEW: { + auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, value)); + return {}; + } + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + return AppendDecimal(input_schema, input_view, row_index, output_array); + case NANOARROW_TYPE_STRUCT: { + for (int64_t child_index = 0; child_index < input_schema.n_children; + ++child_index) { + ICEBERG_RETURN_UNEXPECTED(AppendValue( + *input_schema.children[child_index], *input_array.children[child_index], + *input_view.children[child_index], row_index, + output_array->children[child_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; + } + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: + return AppendListValues(input_schema, input_array, input_view, row_index, + output_array); + case NANOARROW_TYPE_MAP: + return AppendMapValues(input_schema, input_array, input_view, row_index, + output_array); + default: + return NotImplemented("Unsupported Arrow type for merge-on-read projection: {}", + static_cast<int>(input_view.storage_type)); + } +} + +} // namespace + +ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept + : input_schema_(std::exchange(other.input_schema_, nullptr)), + output_schema_(std::exchange(other.output_schema_, nullptr)), + selected_field_indices_(std::move(other.selected_field_indices_)), + input_arrow_schema_(other.input_arrow_schema_), + output_arrow_schema_(other.output_arrow_schema_), + project_batch_function_(std::exchange(other.project_batch_function_, nullptr)), + project_batch_state_(std::move(other.project_batch_state_)) { + other.input_arrow_schema_.release = nullptr; + other.output_arrow_schema_.release = nullptr; +} + +ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) noexcept { + if (this == &other) { + return *this; + } + + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } + + input_schema_ = std::exchange(other.input_schema_, nullptr); + output_schema_ = std::exchange(other.output_schema_, nullptr); + selected_field_indices_ = std::move(other.selected_field_indices_); + input_arrow_schema_ = other.input_arrow_schema_; + other.input_arrow_schema_.release = nullptr; + output_arrow_schema_ = other.output_arrow_schema_; + other.output_arrow_schema_.release = nullptr; + project_batch_function_ = std::exchange(other.project_batch_function_, nullptr); + project_batch_state_ = std::move(other.project_batch_state_); + return *this; +} + +ProjectionContext::~ProjectionContext() { + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } +} + +Result<ProjectionContext> ProjectionContext::Make( + const Schema& input_schema, const Schema& output_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_indices, + BuildSelectedFieldIndices(input_schema.fields(), output_schema.fields())); + + ProjectionContext context; + context.input_schema_ = &input_schema; + context.output_schema_ = &output_schema; + context.selected_field_indices_ = std::move(selected_field_indices); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, &context.input_arrow_schema_)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, &context.output_arrow_schema_)); + context.project_batch_function_ = project_batch_function; + + return context; +} + +const Schema& ProjectionContext::input_schema() const { return *input_schema_; } + +const Schema& ProjectionContext::output_schema() const { return *output_schema_; } + +const ArrowSchema& ProjectionContext::input_arrow_schema() const { + return input_arrow_schema_; +} + +const ArrowSchema& ProjectionContext::output_arrow_schema() const { + return output_arrow_schema_; +} + +std::span<const int> ProjectionContext::selected_field_indices() const { + return selected_field_indices_; +} + +ProjectionContext::ProjectBatchFunction ProjectionContext::project_batch_function() + const { + return project_batch_function_; +} + +ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() { + return project_batch_state_; +} + +void ProjectionContext::RegisterProjectBatchFunction( + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_DCHECK(project_batch_function != nullptr, + "ProjectBatch implementation must not be null"); + if (project_batch_function == nullptr) { + return; + } + std::lock_guard lock(g_project_batch_function_mutex); + g_project_batch_function = project_batch_function; +} + +bool ProjectionContext::HasProjectBatchFunction() { + return GetProjectBatchFunction() != nullptr; +} + +auto ProjectionContext::ResolveProjectBatchFunction() + -> ProjectionContext::ProjectBatchFunction { + return GetProjectBatchFunction(); +} + +namespace { + +Result<ArrowArray> ProjectBatchNanoarrow(const ArrowSchema& input_arrow_schema, + const ArrowArray& input_batch, + std::span<const int32_t> row_indices, + const ArrowSchema& output_arrow_schema, + std::span<const int> selected_field_indices) { + ArrowArrayView input_view; + ArrowError error; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), error); + internal::ArrowArrayViewGuard input_view_guard(&input_view); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewSetArray(&input_view, &input_batch, &error), error); + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_FULL, &error), + error); + + ArrowArray output_array; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayInitFromSchema(&output_array, &output_arrow_schema, &error), error); + internal::ArrowArrayGuard output_array_guard(&output_array); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&output_array)); + ICEBERG_NANOARROW_RETURN_UNEXPECTED( + ArrowArrayReserve(&output_array, static_cast<int64_t>(row_indices.size()))); + + for (int64_t row_index : row_indices) { + ICEBERG_PRECHECK(row_index >= 0 && row_index < input_batch.length, + "Row index {} out of range for batch length {}", row_index, + input_batch.length); + for (size_t output_index = 0; output_index < selected_field_indices.size(); + ++output_index) { + const int input_index = selected_field_indices[output_index]; + ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index], + *input_batch.children[input_index], + *input_view.children[input_index], row_index, + output_array.children[output_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&output_array)); + } + + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowArrayFinishBuildingDefault(&output_array, &error), error); + + return std::exchange(output_array, ArrowArray{}); +} + +} // namespace + +Result<ArrowArray> ProjectBatch(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) { + ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null"); + internal::ArrowArrayGuard input_batch_guard(input_batch); + Review Comment: Suggestion: add a comment explaining why the `ArrowArrayGuard` here does not double-release when the Arrow compute path calls `ImportRecordBatch(input_batch, ...)`. The reason is that `ImportRecordBatch` zeroes the `release` pointer after import, so the guard becomes a no-op. Without this note, a reader might worry about double-free. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
