waynexia commented on code in PR #10727: URL: https://github.com/apache/datafusion/pull/10727#discussion_r1623096681
########## datafusion/core/src/datasource/physical_plan/parquet/opener.rs: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetOpener`] for opening Parquet files + +use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; +use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index}; +use crate::datasource::physical_plan::{ + FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, +}; +use crate::datasource::schema_adapter::SchemaAdapterFactory; +use crate::physical_optimizer::pruning::PruningPredicate; +use arrow_schema::{ArrowError, SchemaRef}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::{StreamExt, TryStreamExt}; +use log::debug; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use std::sync::Arc; + +/// Implements [`FileOpener`] for a parquet file +pub(super) struct ParquetOpener { + pub partition_index: usize, + pub projection: Arc<[usize]>, + pub batch_size: usize, + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + pub pruning_predicate: Option<Arc<PruningPredicate>>, + pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>, + pub table_schema: SchemaRef, + pub metadata_size_hint: Option<usize>, + pub metrics: ExecutionPlanMetricsSet, + pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, + pub pushdown_filters: bool, + pub reorder_filters: bool, + pub enable_page_index: bool, + pub enable_bloom_filter: bool, + pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, +} + +impl FileOpener for ParquetOpener { + fn open(&self, file_meta: FileMeta) -> datafusion_common::Result<FileOpenFuture> { + let file_range = file_meta.range.clone(); + let file_metrics = ParquetFileMetrics::new( + self.partition_index, + file_meta.location().as_ref(), + &self.metrics, + ); + + let reader: Box<dyn AsyncFileReader> = + self.parquet_file_reader_factory.create_reader( + self.partition_index, + file_meta, + self.metadata_size_hint, + &self.metrics, + )?; + + let batch_size = self.batch_size; + let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let schema_adapter = self.schema_adapter_factory.create(projected_schema); + let predicate = self.predicate.clone(); + let pruning_predicate = self.pruning_predicate.clone(); + let page_pruning_predicate = self.page_pruning_predicate.clone(); + let table_schema = self.table_schema.clone(); + let reorder_predicates = self.reorder_filters; + let pushdown_filters = self.pushdown_filters; + let enable_page_index = should_enable_page_index( + self.enable_page_index, + &self.page_pruning_predicate, + ); + let enable_bloom_filter = self.enable_bloom_filter; + let limit = self.limit; + + Ok(Box::pin(async move { + let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + let mut builder = + ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await?; + + let file_schema = builder.schema().clone(); + + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&file_schema)?; + // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; Review Comment: We could probably remove it (though it's not added in this patch. ```suggestion ``` ########## datafusion/core/src/datasource/physical_plan/parquet/mod.rs: ########## @@ -15,66 +15,55 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading Parquet files +//! [`ParquetExec`] Execution plan for reading Parquet files use std::any::Any; use std::fmt::Debug; -use std::ops::Range; use std::sync::Arc; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::file_stream::{ - FileOpenFuture, FileOpener, FileStream, -}; +use crate::datasource::physical_plan::file_stream::FileStream; use crate::datasource::physical_plan::{ parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, - FileMeta, FileScanConfig, + FileScanConfig, }; use crate::{ config::{ConfigOptions, TableParquetOptions}, - datasource::listing::ListingTableUrl, - error::{DataFusionError, Result}, + error::Result, execution::context::TaskContext, physical_optimizer::pruning::PruningPredicate, physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }, }; use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::ArrowError; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::buffered::BufWriter; -use object_store::path::Path; -use object_store::ObjectStore; -use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; -use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; -use tokio::task::JoinSet; mod metrics; +mod opener; mod page_filter; +mod reader; mod row_filter; mod row_groups; mod statistics; +mod writer; Review Comment: Good division :+1: ########## datafusion/core/src/datasource/physical_plan/parquet/writer.rs: ########## @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use crate::datasource::listing::ListingTableUrl; Review Comment: nit: empty line ```suggestion // under the License. use crate::datasource::listing::ListingTableUrl; ``` ########## datafusion/core/src/datasource/physical_plan/parquet/mod.rs: ########## @@ -280,6 +269,8 @@ impl ParquetExecBuilder { /// /// If a custom `ParquetFileReaderFactory` is provided, then data access /// operations will be routed to this factory instead of `ObjectStore`. Review Comment: nit: ```suggestion /// operations will be routed to this factory instead of [`ObjectStore`]. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
