Gabriel39 commented on code in PR #63929: URL: https://github.com/apache/doris/pull/63929#discussion_r3338252498
########## be/src/format/new_parquet/reader/column_reader.cpp: ########## @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/new_parquet/reader/column_reader.h" + +#include <parquet/api/reader.h> +#include <parquet/api/schema.h> + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <exception> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "core/data_type/data_type_array.h" +#include "core/data_type/data_type_map.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_struct.h" +#include "format/new_parquet/parquet_column_schema.h" +#include "format/new_parquet/reader/list_column_reader.h" +#include "format/new_parquet/reader/map_column_reader.h" +#include "format/new_parquet/reader/scalar_column_reader.h" +#include "format/new_parquet/reader/shape_only_column_reader.h" +#include "format/new_parquet/reader/struct_column_reader.h" +#include "format/reader/file_reader.h" + +namespace doris::parquet { +namespace { + +bool supports_nested_scalar_record_reader(const ParquetColumnSchema& column_schema) { + if (supports_record_reader(column_schema.type_descriptor)) { + return true; + } + const auto& type_descriptor = column_schema.type_descriptor; + if (type_descriptor.extra_type_info != ParquetExtraTypeInfo::NONE || + type_descriptor.is_decimal || type_descriptor.is_timestamp || + type_descriptor.is_string_like) { + return false; + } + if (type_descriptor.converted_type != ::parquet::ConvertedType::NONE && + type_descriptor.converted_type != ::parquet::ConvertedType::UNDEFINED) { + return false; + } + switch (type_descriptor.physical_type) { + case ::parquet::Type::BOOLEAN: + case ::parquet::Type::INT32: + case ::parquet::Type::INT64: + case ::parquet::Type::FLOAT: + case ::parquet::Type::DOUBLE: + return true; + default: + return false; + } +} + +const reader::FieldProjection* find_child_projection(const reader::FieldProjection* projection, + const ParquetColumnSchema& child_schema) { + if (projection == nullptr || projection->project_all_children) { + return nullptr; + } + auto it = std::find_if(projection->children.begin(), projection->children.end(), + [&](const reader::FieldProjection& child_projection) { + return child_projection.file_path == child_schema.file_path; + }); + return it == projection->children.end() ? nullptr : &*it; +} + +} // namespace + +Status ParquetColumnReader::skip(int64_t rows) { + return Status::NotSupported("Parquet column skip is not implemented, rows={}", rows); +} + +Status ParquetColumnReader::select(const SelectionVector& sel, uint16_t selected_rows, + int64_t batch_rows, MutableColumnPtr& column) { + if (column.get() == nullptr) { + return Status::InvalidArgument("Parquet selected read result is null for column {}", + name()); + } + RETURN_IF_ERROR(sel.verify(selected_rows, batch_rows)); + + const auto ranges = selection_to_ranges(sel, selected_rows); + int64_t cursor = 0; + for (const auto& range : ranges) { + if (range.start < cursor || range.start + range.length > batch_rows) { + return Status::InvalidArgument("Invalid parquet selection range [{}, {}) for column {}", + range.start, range.start + range.length, name()); + } + RETURN_IF_ERROR(skip(range.start - cursor)); + + int64_t range_rows_read = 0; + RETURN_IF_ERROR(read(range.length, column, &range_rows_read)); + if (range_rows_read != range.length) { + return Status::Corruption( + "Parquet selected read returned {} rows, expected {} rows for column {}", + range_rows_read, range.length, name()); + } + cursor = range.start + range.length; + } + RETURN_IF_ERROR(skip(batch_rows - cursor)); + return Status::OK(); +} + +ParquetColumnReaderFactory::ParquetColumnReaderFactory( + std::shared_ptr<::parquet::RowGroupReader> row_group, int num_leaf_columns) + : _row_group(std::move(row_group)), + _record_readers(static_cast<size_t>(num_leaf_columns)) {} + +reader::SchemaField ParquetColumnReaderFactory::row_position_schema_field() { + reader::SchemaField field; + field.id = ROW_POSITION_COLUMN_ID; + field.name = ROW_POSITION_COLUMN_NAME; + field.type = std::make_shared<DataTypeInt64>(); + field.column_type = reader::ColumnType::ROW_NUMBER; + return field; +} + +std::unique_ptr<ParquetColumnReader> ParquetColumnReaderFactory::create_row_position_column_reader( + int64_t row_group_first_row) const { + return std::make_unique<RowPositionColumnReader>(row_group_first_row); +} + +Status ParquetColumnReaderFactory::create_scalar_reader( + int parquet_leaf_column_id, const ParquetTypeDescriptor& type_descriptor, + const ::parquet::ColumnDescriptor* descriptor, DataTypePtr type, std::string name, + std::shared_ptr<::parquet::internal::RecordReader> record_reader, + std::unique_ptr<ParquetColumnReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + if (descriptor == nullptr || type == nullptr || record_reader == nullptr) { + return Status::InvalidArgument("Invalid parquet column reader arguments for column {}", + name); + } + *reader = std::make_unique<ScalarColumnReader>(parquet_leaf_column_id, descriptor, + type_descriptor, std::move(type), + std::move(name), std::move(record_reader)); + return Status::OK(); +} + +Status ParquetColumnReaderFactory::create_scalar_column_reader( + const ParquetColumnSchema& column_schema, + std::unique_ptr<ParquetColumnReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + if (column_schema.leaf_column_id < 0 || + column_schema.leaf_column_id >= static_cast<int>(_record_readers.size())) { + return Status::InvalidArgument("Invalid parquet leaf column id {} for column {}", + column_schema.leaf_column_id, column_schema.name); + } + if (!supports_record_reader(column_schema.type_descriptor)) { + return Status::NotSupported( + "Current parquet reader only supports primitive columns without repetition; " + "column {} is not supported", + column_schema.name); + } + if (column_schema.descriptor == nullptr || + column_schema.descriptor->max_repetition_level() != 0 || + column_schema.descriptor->max_definition_level() > 1) { + return Status::NotSupported( + "Current parquet scalar reader only supports flat primitive columns; column {} is " + "not supported", + column_schema.name); + } + std::shared_ptr<::parquet::internal::RecordReader> record_reader; + RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id, column_schema.descriptor, + column_schema.name, &record_reader)); + return create_scalar_reader(column_schema.leaf_column_id, column_schema.type_descriptor, + column_schema.descriptor, column_schema.type, column_schema.name, + std::move(record_reader), reader); +} + +Status ParquetColumnReaderFactory::create_nested_scalar_column_reader( + const ParquetColumnSchema& column_schema, + std::unique_ptr<ParquetColumnReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE) { + return Status::InvalidArgument("Parquet nested scalar reader requires primitive column {}", + column_schema.name); + } + if (column_schema.leaf_column_id < 0 || + column_schema.leaf_column_id >= static_cast<int>(_record_readers.size())) { + return Status::InvalidArgument("Invalid parquet leaf column id {} for column {}", + column_schema.leaf_column_id, column_schema.name); + } + if (!supports_nested_scalar_record_reader(column_schema)) { + return Status::NotSupported( + "Current parquet nested scalar reader does not support column {}", + column_schema.name); + } + std::shared_ptr<::parquet::internal::RecordReader> record_reader; + RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id, column_schema.descriptor, + column_schema.name, &record_reader)); + return create_scalar_reader(column_schema.leaf_column_id, column_schema.type_descriptor, + column_schema.descriptor, column_schema.type, column_schema.name, + std::move(record_reader), reader); +} + +Status ParquetColumnReaderFactory::get_record_reader( + int leaf_column_id, const ::parquet::ColumnDescriptor* descriptor, const std::string& name, + std::shared_ptr<::parquet::internal::RecordReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + if (_row_group == nullptr) { + return Status::InternalError("Parquet row group reader is not initialized for column {}", + name); + } + if (leaf_column_id < 0 || leaf_column_id >= static_cast<int>(_record_readers.size())) { + return Status::InvalidArgument("Invalid parquet leaf column id {} for column {}", + leaf_column_id, name); + } + if (descriptor == nullptr) { + return Status::InvalidArgument("Parquet column descriptor is null for column {}", name); + } + if (_record_readers[leaf_column_id] == nullptr) { + try { + _record_readers[leaf_column_id] = + _row_group->RecordReader(leaf_column_id, /*read_dictionary=*/false); + } catch (const ::parquet::ParquetException& e) { + return Status::Corruption("Failed to create parquet record reader for column {}: {}", + name, e.what()); + } catch (const std::exception& e) { + return Status::InternalError("Failed to create parquet record reader for column {}: {}", + name, e.what()); + } + } + if (_record_readers[leaf_column_id] == nullptr) { + return Status::Corruption("Failed to create parquet record reader for column {}", name); + } + *reader = _record_readers[leaf_column_id]; + return Status::OK(); +} + +Status ParquetColumnReaderFactory::create_struct_column_reader( + const ParquetColumnSchema& column_schema, const reader::FieldProjection* projection, + std::unique_ptr<ParquetColumnReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + std::vector<std::unique_ptr<ParquetColumnReader>> child_readers; + child_readers.reserve(column_schema.children.size()); + std::vector<int> child_output_indices; + child_output_indices.reserve(column_schema.children.size()); + DataTypes projected_child_types; + Strings projected_child_names; + for (size_t child_idx = 0; child_idx < column_schema.children.size(); ++child_idx) { + const auto& child_schema = column_schema.children[child_idx]; + const auto* child_projection = find_child_projection(projection, *child_schema); + const bool child_is_projected = projection == nullptr || projection->project_all_children || + child_projection != nullptr; + std::unique_ptr<ParquetColumnReader> child_reader; + if (child_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) { + RETURN_IF_ERROR(create_nested_scalar_column_reader(*child_schema, &child_reader)); + } else { + RETURN_IF_ERROR(create(*child_schema, child_projection, &child_reader)); + } + if (child_is_projected) { + child_output_indices.push_back(static_cast<int>(projected_child_types.size())); + projected_child_types.push_back(child_reader->type()); + projected_child_names.push_back(child_reader->name()); + } else { + if (child_schema->kind != ParquetColumnSchemaKind::PRIMITIVE) { + child_reader = std::make_unique<ShapeOnlyColumnReader>(std::move(child_reader)); + } + child_output_indices.push_back(-1); + } + child_readers.push_back(std::move(child_reader)); + } + if (projected_child_types.empty() && !column_schema.children.empty()) { + return Status::NotSupported("Parquet STRUCT projection for column {} contains no children", + column_schema.name); + } + DataTypePtr type = column_schema.type; + if (projection != nullptr && !projection->project_all_children) { + type = std::make_shared<DataTypeStruct>(projected_child_types, projected_child_names); + if (column_schema.type != nullptr && column_schema.type->is_nullable()) { + type = make_nullable(type); + } + } + *reader = std::make_unique<StructColumnReader>(column_schema, std::move(type), + std::move(child_readers), + std::move(child_output_indices)); + return Status::OK(); +} + +Status ParquetColumnReaderFactory::create_list_column_reader( + const ParquetColumnSchema& column_schema, const reader::FieldProjection* projection, + std::unique_ptr<ParquetColumnReader>* reader) const { + if (reader == nullptr) { + return Status::InvalidArgument("reader is null"); + } + if (column_schema.children.size() != 1) { + return Status::NotSupported("Unsupported parquet LIST layout for column {}", + column_schema.name); + } + std::unique_ptr<ParquetColumnReader> element_reader; + const auto& element_schema = *column_schema.children[0]; + const auto* element_projection = find_child_projection(projection, element_schema); + if (projection != nullptr && !projection->project_all_children && + element_projection == nullptr) { + return Status::NotSupported("Parquet LIST projection for column {} contains no element", + column_schema.name); + } + if (element_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { + if (element_projection != nullptr && !element_projection->project_all_children) { + return Status::InvalidArgument( + "Parquet LIST scalar element projection is invalid for column {}", + column_schema.name); + } + RETURN_IF_ERROR(create_nested_scalar_column_reader(element_schema, &element_reader)); + } else if (element_schema.kind == ParquetColumnSchemaKind::STRUCT) { + RETURN_IF_ERROR( + create_struct_column_reader(element_schema, element_projection, &element_reader)); + } else if (element_schema.kind == ParquetColumnSchemaKind::LIST) { + RETURN_IF_ERROR( + create_list_column_reader(element_schema, element_projection, &element_reader)); + } else { + return Status::NotSupported( + "Current parquet LIST reader does not support nested complex element column {}", Review Comment: Wrong message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
