This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 4fb89f8 feat: add scaffolding work for parquet reader (#154) 4fb89f8 is described below commit 4fb89f8b5a005b7f17368cb0298af4771b1defe2 Author: Gang Wu <ust...@gmail.com> AuthorDate: Thu Jul 31 23:10:44 2025 +0800 feat: add scaffolding work for parquet reader (#154) --- cmake_modules/IcebergThirdpartyToolchain.cmake | 3 +- src/iceberg/CMakeLists.txt | 6 +- src/iceberg/arrow/arrow_error_transform_internal.h | 23 +- src/iceberg/parquet/CMakeLists.txt | 18 ++ src/iceberg/parquet/parquet_data_util.cc | 31 +++ src/iceberg/parquet/parquet_data_util_internal.h | 43 ++++ src/iceberg/parquet/parquet_reader.cc | 264 +++++++++++++++++++++ src/iceberg/parquet/parquet_reader.h | 49 ++++ src/iceberg/parquet/parquet_schema_util.cc | 37 +++ src/iceberg/parquet/parquet_schema_util_internal.h | 54 +++++ 10 files changed, 515 insertions(+), 13 deletions(-) diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index ed3d1d7..826d5d4 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -65,8 +65,9 @@ function(resolve_arrow_dependency) set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE) + # Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) set(ARROW_IPC - OFF + ON CACHE BOOL "" FORCE) set(ARROW_FILESYSTEM ON diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 4751e9f..d90c054 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -109,7 +109,10 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_reader.cc avro/avro_schema_util.cc avro/avro_register.cc - avro/avro_stream_internal.cc) + avro/avro_stream_internal.cc + parquet/parquet_data_util.cc + parquet/parquet_reader.cc + parquet/parquet_schema_util.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) @@ -161,6 +164,7 @@ if(ICEBERG_BUILD_BUNDLE) add_subdirectory(arrow) add_subdirectory(avro) + add_subdirectory(parquet) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_bundle_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index e33df41..cf64892 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -43,17 +43,18 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } \ lhs = std::move(result_name).ValueOrDie(); -#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ - ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ - ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind) - -#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ - do { \ - auto&& _status = (expr); \ - if (!_status.ok()) { \ - return std::unexpected<Error>{ \ - {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \ - } \ +#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ + ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ + ::iceberg::arrow::ToErrorKind) + +#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ + do { \ + auto&& _status = (expr); \ + if (!_status.ok()) { \ + return std::unexpected<Error>{{.kind = ::iceberg::arrow::ToErrorKind(_status), \ + .message = _status.ToString()}}; \ + } \ } while (0) } // namespace iceberg::arrow diff --git a/src/iceberg/parquet/CMakeLists.txt b/src/iceberg/parquet/CMakeLists.txt new file mode 100644 index 0000000..49a389d --- /dev/null +++ b/src/iceberg/parquet/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/parquet) diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc new file mode 100644 index 0000000..6237b00 --- /dev/null +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_data_util_internal.h" + +namespace iceberg::parquet { + +Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch( + std::shared_ptr<::arrow::RecordBatch> record_batch, + const std::shared_ptr<::arrow::Schema>& output_arrow_schema, + const Schema& projected_schema, const SchemaProjection& projection) { + return NotImplemented("NYI"); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_data_util_internal.h b/src/iceberg/parquet/parquet_data_util_internal.h new file mode 100644 index 0000000..c222d74 --- /dev/null +++ b/src/iceberg/parquet/parquet_data_util_internal.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/schema_util.h" + +namespace arrow { +class RecordBatch; +class Schema; +} // namespace arrow + +namespace iceberg::parquet { + +/// \brief Convert record batch read from a Parquet file to projected Iceberg Schema. +/// +/// \param record_batch The record batch to convert. +/// \param output_arrow_schema The Arrow schema to convert to. +/// \param projected_schema The projected Iceberg schema. +/// \param projection The projection from projected Iceberg schema to the record batch. +/// \return The converted record batch. +Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch( + std::shared_ptr<::arrow::RecordBatch> record_batch, + const std::shared_ptr<::arrow::Schema>& output_arrow_schema, + const Schema& projected_schema, const SchemaProjection& projection); + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc new file mode 100644 index 0000000..3ee260a --- /dev/null +++ b/src/iceberg/parquet/parquet_reader.cc @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_reader.h" + +#include <memory> + +#include <arrow/c/bridge.h> +#include <arrow/memory_pool.h> +#include <arrow/record_batch.h> +#include <arrow/result.h> +#include <arrow/type.h> +#include <parquet/arrow/reader.h> +#include <parquet/arrow/schema.h> +#include <parquet/file_reader.h> +#include <parquet/properties.h> + +#include "iceberg/arrow/arrow_error_transform_internal.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/parquet/parquet_data_util_internal.h" +#include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::parquet { + +namespace { + +Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputStream( + const ReaderOptions& options) { + ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); + if (options.length) { + file_info.set_size(options.length.value()); + } + + auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, io->fs()->OpenInputFile(file_info)); + return input; +} + +Result<SchemaProjection> BuildProjection(::parquet::arrow::FileReader* reader, + const Schema& read_schema) { + auto metadata = reader->parquet_reader()->metadata(); + + ICEBERG_ASSIGN_OR_RAISE(auto has_field_ids, + HasFieldIds(metadata->schema()->schema_root())); + if (!has_field_ids) { + // TODO(gangwu): apply name mapping to Parquet schema + return NotImplemented("Applying name mapping to Parquet schema is not implemented"); + } + + ::parquet::arrow::SchemaManifest schema_manifest; + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::SchemaManifest::Make( + metadata->schema(), metadata->key_value_metadata(), reader->properties(), + &schema_manifest)); + + // Leverage SchemaManifest to project the schema + ICEBERG_ASSIGN_OR_RAISE(auto projection, Project(read_schema, schema_manifest)); + return projection; +} + +class EmptyRecordBatchReader : public ::arrow::RecordBatchReader { + public: + EmptyRecordBatchReader() = default; + ~EmptyRecordBatchReader() override = default; + + std::shared_ptr<::arrow::Schema> schema() const override { return nullptr; } + + ::arrow::Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* batch) override { + batch = nullptr; + return ::arrow::Status::OK(); + } +}; + +} // namespace + +// A stateful context to keep track of the reading progress. +struct ReadContext { + // The arrow schema to output record batches. It may be different with + // the schema of record batches returned by `record_batch_reader_` + // when there is any schema evolution. + std::shared_ptr<::arrow::Schema> output_arrow_schema_; + // The reader to read record batches from the Parquet file. + std::unique_ptr<::arrow::RecordBatchReader> record_batch_reader_; +}; + +// TODO(gangwu): list of work items +// 1. Make the memory pool configurable +// 2. Catch ParquetException and convert to Status/Result +// 3. Add utility to convert Arrow Status/Result to Iceberg Status/Result +// 4. Check field ids and apply name mapping if needed +class ParquetReader::Impl { + public: + // Open the Parquet reader with the given options + Status Open(const ReaderOptions& options) { + if (options.projection == nullptr) { + return InvalidArgument("Projected schema is required by Parquet reader"); + } + + split_ = options.split; + read_schema_ = options.projection; + + // TODO(gangwu): make memory pool configurable + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(); + + // Prepare reader properties + ::parquet::ReaderProperties reader_properties(pool); + ::parquet::ArrowReaderProperties arrow_reader_properties; + arrow_reader_properties.set_batch_size(options.batch_size); + arrow_reader_properties.set_arrow_extensions_enabled(true); + + // Open the Parquet file reader + ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options)); + auto file_reader = + ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( + pool, std::move(file_reader), arrow_reader_properties, &reader_)); + + // Project read schema onto the Parquet file schema + ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_)); + + return {}; + } + + // Read the next batch of data + Result<std::optional<ArrowArray>> Next() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, context_->record_batch_reader_->Next()); + if (!batch) { + return std::nullopt; + } + + ICEBERG_ASSIGN_OR_RAISE( + batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_, + *read_schema_, projection_)); + + ArrowArray arrow_array; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array)); + return arrow_array; + } + + // Close the reader and release resources + Status Close() { + if (reader_ == nullptr) { + return {}; // Already closed + } + + if (context_ != nullptr) { + ICEBERG_ARROW_RETURN_NOT_OK(context_->record_batch_reader_->Close()); + context_.reset(); + } + + reader_.reset(); + return {}; + } + + // Get the schema of the data + Result<ArrowSchema> Schema() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + + ArrowSchema arrow_schema; + ICEBERG_ARROW_RETURN_NOT_OK( + ::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema)); + return arrow_schema; + } + + private: + Status InitReadContext() { + context_ = std::make_unique<ReadContext>(); + + // Build the output Arrow schema + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_, + ::arrow::ImportSchema(&arrow_schema)); + + // Row group pruning based on the split + // TODO(gangwu): add row group filtering based on zone map, bloom filter, etc. + std::vector<int> row_group_indices; + if (split_.has_value()) { + auto metadata = reader_->parquet_reader()->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_offset = metadata->RowGroup(i)->file_offset(); + if (row_group_offset >= split_->offset && + row_group_offset < split_->offset + split_->length) { + row_group_indices.push_back(i); + } else if (row_group_offset >= split_->offset + split_->length) { + break; + } + } + if (row_group_indices.empty()) { + // None of the row groups are selected, return an empty record batch reader + context_->record_batch_reader_ = std::make_unique<EmptyRecordBatchReader>(); + return {}; + } + } + + // Create the record batch reader + ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_)); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + context_->record_batch_reader_, + reader_->GetRecordBatchReader(row_group_indices, column_indices)); + + return {}; + } + + private: + // The split to read from the Parquet file. + std::optional<Split> split_; + // Schema to read from the Parquet file. + std::shared_ptr<::iceberg::Schema> read_schema_; + // The projection result to apply to the read schema. + SchemaProjection projection_; + // Parquet file reader to create RecordBatchReader. + std::unique_ptr<::parquet::arrow::FileReader> reader_; + // The context to keep track of the reading progress. + std::unique_ptr<ReadContext> context_; +}; + +ParquetReader::~ParquetReader() = default; + +Result<std::optional<ArrowArray>> ParquetReader::Next() { return impl_->Next(); } + +Result<ArrowSchema> ParquetReader::Schema() { return impl_->Schema(); } + +Status ParquetReader::Open(const ReaderOptions& options) { + impl_ = std::make_unique<Impl>(); + return impl_->Open(options); +} + +Status ParquetReader::Close() { return impl_->Close(); } + +void ParquetReader::Register() { + static ReaderFactoryRegistry parquet_reader_register( + FileFormatType::kParquet, []() -> Result<std::unique_ptr<Reader>> { + return std::make_unique<ParquetReader>(); + }); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_reader.h b/src/iceberg/parquet/parquet_reader.h new file mode 100644 index 0000000..d29daca --- /dev/null +++ b/src/iceberg/parquet/parquet_reader.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_reader.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::parquet { + +/// \brief A reader that reads ArrowArray from Parquet files. +class ICEBERG_BUNDLE_EXPORT ParquetReader : public Reader { + public: + ParquetReader() = default; + + ~ParquetReader() override; + + Status Open(const ReaderOptions& options) final; + + Status Close() final; + + Result<std::optional<ArrowArray>> Next() final; + + Result<ArrowSchema> Schema() final; + + static void Register(); + + private: + class Impl; + std::unique_ptr<Impl> impl_; +}; + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc new file mode 100644 index 0000000..8648fa9 --- /dev/null +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_schema_util_internal.h" + +namespace iceberg::parquet { + +Result<SchemaProjection> Project(const Schema& expected_schema, + const ::parquet::arrow::SchemaManifest& parquet_schema) { + return NotImplemented("NYI"); +} + +Result<std::vector<int>> SelectedColumnIndices(const SchemaProjection& projection) { + return NotImplemented("NYI"); +} + +Result<bool> HasFieldIds(const ::parquet::schema::NodePtr& root_node) { + return NotImplemented("NYI"); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h new file mode 100644 index 0000000..f3b0f37 --- /dev/null +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <parquet/arrow/schema.h> + +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" + +namespace iceberg::parquet { + +/// \brief Project an Iceberg Schema onto a Parquet Schema. +/// +/// This function creates a projection from an Iceberg Schema to a Parquet schema. +/// The projection determines how to read data from the Parquet schema into the expected +/// Iceberg Schema. +/// +/// \param expected_schema The Iceberg Schema that defines the expected structure. +/// \param parquet_schema The Parquet schema to read data from. +/// \return The schema projection result with column indices of projected Parquet columns. +Result<SchemaProjection> Project(const Schema& expected_schema, + const ::parquet::arrow::SchemaManifest& parquet_schema); + +/// \brief Get the selected column indices by walking through the projection result. +/// +/// \param projection The schema projection result. +/// \return The selected column indices. +Result<std::vector<int>> SelectedColumnIndices(const SchemaProjection& projection); + +/// \brief Check whether the Parquet schema has field IDs. +/// +/// \param root_node The root node of the Parquet schema. +/// \return True if the Parquet schema has field IDs, false otherwise. Return error if +/// the Parquet schema has partial field IDs. +Result<bool> HasFieldIds(const ::parquet::schema::NodePtr& root_node); + +} // namespace iceberg::parquet