wgtmac commented on code in PR #657: URL: https://github.com/apache/iceberg-cpp/pull/657#discussion_r3275148199
########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: + using ProjectBatchState = std::shared_ptr<void>; + + using ProjectBatchFunction = auto (*)(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) + -> Result<ArrowArray>; + + /// \brief Register a custom implementation for ProjectBatch. + static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); + + /// \brief Returns true when a custom implementation has been registered. + static bool HasProjectBatchFunction(); + + /// \brief Resolve the registered ProjectBatch implementation. + static auto ResolveProjectBatchFunction() -> ProjectBatchFunction; + + /// \brief Build reusable projection state for a validated schema pair. + /// + /// \param input_schema Schema that describes every input batch. + /// \param output_schema Final schema and column order requested by the caller. + /// \param project_batch_function Optional implementation returned by + /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow + /// path. + /// \note It validates that output_schema selects or reorders complete top-level fields + /// by field id. Nested pruning and type changes are rejected. The input_schema and + /// output_schema passed to Make must outlive the context. ProjectBatch may lazily + /// initialize backend cache; do not share one context across concurrent calls. + static Result<ProjectionContext> Make(const Schema& input_schema, + const Schema& output_schema, + ProjectBatchFunction project_batch_function); + + ProjectionContext(ProjectionContext&&) noexcept; + ProjectionContext& operator=(ProjectionContext&&) noexcept; + ~ProjectionContext(); + + ProjectionContext(const ProjectionContext&) = delete; + ProjectionContext& operator=(const ProjectionContext&) = delete; + + const Schema& input_schema() const; + + const Schema& output_schema() const; + + const ArrowSchema& input_arrow_schema() const; + + const ArrowSchema& output_arrow_schema() const; + + std::span<const int> selected_field_indices() const; + + ProjectBatchFunction project_batch_function() const; + + ProjectBatchState& project_batch_state(); + + private: + ProjectionContext() = default; + + const Schema* input_schema_ = nullptr; + const Schema* output_schema_ = nullptr; Review Comment: Kept borrowed pointers and added lifetime comments in the header and FileScanTaskReader call site. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> Review Comment: Removed. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: + using ProjectBatchState = std::shared_ptr<void>; + + using ProjectBatchFunction = auto (*)(ArrowArray* input_batch, + std::span<const int32_t> row_indices, + ProjectionContext& projection) + -> Result<ArrowArray>; + + /// \brief Register a custom implementation for ProjectBatch. + static void RegisterProjectBatchFunction(ProjectBatchFunction project_batch_function); + + /// \brief Returns true when a custom implementation has been registered. + static bool HasProjectBatchFunction(); + + /// \brief Resolve the registered ProjectBatch implementation. + static auto ResolveProjectBatchFunction() -> ProjectBatchFunction; + + /// \brief Build reusable projection state for a validated schema pair. + /// + /// \param input_schema Schema that describes every input batch. + /// \param output_schema Final schema and column order requested by the caller. + /// \param project_batch_function Optional implementation returned by + /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the nanoarrow + /// path. + /// \note It validates that output_schema selects or reorders complete top-level fields + /// by field id. Nested pruning and type changes are rejected. The input_schema and + /// output_schema passed to Make must outlive the context. ProjectBatch may lazily + /// initialize backend cache; do not share one context across concurrent calls. + static Result<ProjectionContext> Make(const Schema& input_schema, + const Schema& output_schema, + ProjectBatchFunction project_batch_function); + + ProjectionContext(ProjectionContext&&) noexcept; + ProjectionContext& operator=(ProjectionContext&&) noexcept; + ~ProjectionContext(); + + ProjectionContext(const ProjectionContext&) = delete; + ProjectionContext& operator=(const ProjectionContext&) = delete; + + const Schema& input_schema() const; + + const Schema& output_schema() const; + + const ArrowSchema& input_arrow_schema() const; + + const ArrowSchema& output_arrow_schema() const; + + std::span<const int> selected_field_indices() const; + + ProjectBatchFunction project_batch_function() const; + + ProjectBatchState& project_batch_state(); + + private: + ProjectionContext() = default; + + const Schema* input_schema_ = nullptr; + const Schema* output_schema_ = nullptr; + std::vector<int> selected_field_indices_; + ArrowSchema input_arrow_schema_{}; Review Comment: Changed selected field indices to int32_t consistently. ########## src/iceberg/arrow_c_data_util_internal.h: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cerrno> +#include <concepts> +#include <cstdint> +#include <cstring> +#include <memory> +#include <optional> +#include <span> +#include <string> +#include <tuple> +#include <utility> +#include <vector> + +#include "iceberg/arrow_c_data.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Cached state for ProjectBatch over one input/output schema pair. +class ICEBERG_EXPORT ProjectionContext { + public: Review Comment: Added a short export comment. ########## src/iceberg/arrow_c_data_util.cc: ########## @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdint> +#include <limits> +#include <memory> +#include <mutex> +#include <span> +#include <utility> +#include <vector> + +#include <nanoarrow/nanoarrow.h> + +#include "iceberg/arrow/nanoarrow_status_internal.h" +#include "iceberg/arrow_c_data_guard_internal.h" +#include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/file_reader.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) { + return MakeArrowArrayStream<Reader>(std::move(reader)); +} + +namespace { + +Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t field_id) { + for (size_t index = 0; index < fields.size(); ++index) { + if (fields[index].field_id() == field_id) { + return index; + } + } + return InvalidArgument("Required schema does not contain projected field id {}", + field_id); +} + +std::mutex g_project_batch_function_mutex; +ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr; + +ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() { + std::lock_guard lock(g_project_batch_function_mutex); + return g_project_batch_function; +} + +Result<std::vector<int>> BuildSelectedFieldIndices( + std::span<const SchemaField> input_fields, + std::span<const SchemaField> output_fields) { + std::vector<int> selected_field_indices; + selected_field_indices.reserve(output_fields.size()); + + for (const auto& output_field : output_fields) { + ICEBERG_ASSIGN_OR_RAISE(auto input_index, + FindFieldIndexById(input_fields, output_field.field_id())); + const auto& input_field = input_fields[input_index]; + if (*input_field.type() != *output_field.type()) { + return InvalidArgument( + "ProjectBatch only supports complete top-level fields, but field id " + "{} changes type from {} to {}", + output_field.field_id(), input_field.type()->ToString(), + output_field.type()->ToString()); + } + ICEBERG_PRECHECK(input_index <= static_cast<size_t>(std::numeric_limits<int>::max()), + "Input field index {} exceeds int range", input_index); + selected_field_indices.push_back(static_cast<int>(input_index)); + } + + return selected_field_indices; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array); + +Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t element_index = begin; element_index < end; ++element_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], element_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index); + const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 1); + for (int64_t entry_index = begin; entry_index < end; ++entry_index) { + ICEBERG_RETURN_UNEXPECTED( + AppendValue(*input_schema.children[0], *input_array.children[0], + *input_view.children[0], entry_index, output_array->children[0])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; +} + +Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& input_view, + int64_t row_index, ArrowArray* output_array) { + ArrowError error; + ArrowSchemaView schema_view; + ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( + ArrowSchemaViewInit(&schema_view, &input_schema, &error), error); + + ArrowDecimal value; + ArrowDecimalInit(&value, schema_view.decimal_bitwidth, schema_view.decimal_precision, + schema_view.decimal_scale); + ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, &value)); + return {}; +} + +Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& input_array, + const ArrowArrayView& input_view, int64_t row_index, + ArrowArray* output_array) { + if (ArrowArrayViewIsNull(&input_view, row_index)) { + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + } + + switch (input_view.storage_type) { + case NANOARROW_TYPE_NA: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1)); + return {}; + case NANOARROW_TYPE_BOOL: + case NANOARROW_TYPE_INT8: + case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_DATE32: + case NANOARROW_TYPE_DATE64: + case NANOARROW_TYPE_TIME32: + case NANOARROW_TYPE_TIME64: + case NANOARROW_TYPE_TIMESTAMP: + case NANOARROW_TYPE_DURATION: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt( + output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_UINT64: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt( + output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_HALF_FLOAT: + case NANOARROW_TYPE_FLOAT: + case NANOARROW_TYPE_DOUBLE: + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble( + output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, row_index))); + return {}; + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_STRING_VIEW: { + auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, value)); + return {}; + } + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_BINARY: + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + case NANOARROW_TYPE_BINARY_VIEW: { + auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index); + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, value)); + return {}; + } + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + return AppendDecimal(input_schema, input_view, row_index, output_array); + case NANOARROW_TYPE_STRUCT: { + for (int64_t child_index = 0; child_index < input_schema.n_children; + ++child_index) { + ICEBERG_RETURN_UNEXPECTED(AppendValue( + *input_schema.children[child_index], *input_array.children[child_index], + *input_view.children[child_index], row_index, + output_array->children[child_index])); + } + ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array)); + return {}; + } + case NANOARROW_TYPE_LIST: + case NANOARROW_TYPE_LARGE_LIST: + case NANOARROW_TYPE_FIXED_SIZE_LIST: + return AppendListValues(input_schema, input_array, input_view, row_index, + output_array); + case NANOARROW_TYPE_MAP: + return AppendMapValues(input_schema, input_array, input_view, row_index, + output_array); + default: + return NotImplemented("Unsupported Arrow type for merge-on-read projection: {}", + static_cast<int>(input_view.storage_type)); + } +} + +} // namespace + +ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept + : input_schema_(std::exchange(other.input_schema_, nullptr)), + output_schema_(std::exchange(other.output_schema_, nullptr)), + selected_field_indices_(std::move(other.selected_field_indices_)), + input_arrow_schema_(other.input_arrow_schema_), + output_arrow_schema_(other.output_arrow_schema_), + project_batch_function_(std::exchange(other.project_batch_function_, nullptr)), + project_batch_state_(std::move(other.project_batch_state_)) { + other.input_arrow_schema_.release = nullptr; + other.output_arrow_schema_.release = nullptr; +} + +ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) noexcept { + if (this == &other) { + return *this; + } + + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } + + input_schema_ = std::exchange(other.input_schema_, nullptr); + output_schema_ = std::exchange(other.output_schema_, nullptr); + selected_field_indices_ = std::move(other.selected_field_indices_); + input_arrow_schema_ = other.input_arrow_schema_; + other.input_arrow_schema_.release = nullptr; + output_arrow_schema_ = other.output_arrow_schema_; + other.output_arrow_schema_.release = nullptr; + project_batch_function_ = std::exchange(other.project_batch_function_, nullptr); + project_batch_state_ = std::move(other.project_batch_state_); + return *this; +} + +ProjectionContext::~ProjectionContext() { + if (input_arrow_schema_.release != nullptr) { + input_arrow_schema_.release(&input_arrow_schema_); + } + if (output_arrow_schema_.release != nullptr) { + output_arrow_schema_.release(&output_arrow_schema_); + } +} + +Result<ProjectionContext> ProjectionContext::Make( + const Schema& input_schema, const Schema& output_schema, + ProjectionContext::ProjectBatchFunction project_batch_function) { + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_indices, + BuildSelectedFieldIndices(input_schema.fields(), output_schema.fields())); + + ProjectionContext context; + context.input_schema_ = &input_schema; + context.output_schema_ = &output_schema; + context.selected_field_indices_ = std::move(selected_field_indices); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, &context.input_arrow_schema_)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, &context.output_arrow_schema_)); + context.project_batch_function_ = project_batch_function; + + return context; +} + +const Schema& ProjectionContext::input_schema() const { return *input_schema_; } + +const Schema& ProjectionContext::output_schema() const { return *output_schema_; } + +const ArrowSchema& ProjectionContext::input_arrow_schema() const { + return input_arrow_schema_; +} + +const ArrowSchema& ProjectionContext::output_arrow_schema() const { + return output_arrow_schema_; +} + +std::span<const int> ProjectionContext::selected_field_indices() const { + return selected_field_indices_; +} + +ProjectionContext::ProjectBatchFunction ProjectionContext::project_batch_function() + const { + return project_batch_function_; +} + +ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() { + return project_batch_state_; +} + +void ProjectionContext::RegisterProjectBatchFunction( + ProjectionContext::ProjectBatchFunction project_batch_function) { Review Comment: Documented process-wide last-writer-wins behavior in the header. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
