Dandandan commented on code in PR #20839:
URL: https://github.com/apache/datafusion/pull/20839#discussion_r2909864347


##########
datafusion/datasource-parquet/src/push_opener.rs:
##########
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`PushParquetOpener`] for reading Parquet files using 
[`ParquetPushDecoder`]
+
+use crate::opener::build_pruning_predicates;
+use crate::row_filter;
+use crate::row_group_filter::RowGroupAccessPlanFilter;
+use crate::{
+    ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
+    apply_file_schema_type_coercions,
+};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
+use datafusion_datasource::{PartitionedFile, TableSchema};
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::utils::reassign_expr_columns;
+use datafusion_physical_expr_adapter::{
+    PhysicalExprAdapterFactory, replace_columns_with_literals,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricBuilder};
+use futures::StreamExt;
+use parquet::DecodeResult;
+use parquet::arrow::ProjectionMask;
+use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+/// A [`FileOpener`] for Parquet files that uses [`ParquetPushDecoder`] for
+/// decoding. This gives fine-grained control over IO vs CPU scheduling.
+pub(super) struct PushParquetOpener {
+    pub partition_index: usize,
+    pub projection: ProjectionExprs,
+    pub batch_size: usize,
+    pub limit: Option<usize>,
+    pub predicate: Option<Arc<dyn PhysicalExpr>>,
+    pub table_schema: TableSchema,
+    pub metadata_size_hint: Option<usize>,
+    pub metrics: ExecutionPlanMetricsSet,
+    pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    pub pushdown_filters: bool,
+    pub reorder_filters: bool,
+    pub enable_row_group_stats_pruning: bool,
+    pub expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
+}
+
+impl FileOpener for PushParquetOpener {
+    fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
+        let file_name = partitioned_file.object_meta.location.to_string();
+        let file_metrics =
+            ParquetFileMetrics::new(self.partition_index, &file_name, 
&self.metrics);
+
+        let metadata_size_hint = partitioned_file
+            .metadata_size_hint
+            .or(self.metadata_size_hint);
+
+        let mut async_file_reader: Box<dyn AsyncFileReader> =
+            self.parquet_file_reader_factory.create_reader(
+                self.partition_index,
+                partitioned_file.clone(),
+                metadata_size_hint,
+                &self.metrics,
+            )?;
+
+        let batch_size = self.batch_size;
+        let limit = self.limit;
+        let projection = self.projection.clone();
+        let logical_file_schema = Arc::clone(self.table_schema.file_schema());
+        let predicate = self.predicate.clone();
+        let pushdown_filters = self.pushdown_filters;
+        let reorder_filters = self.reorder_filters;
+        let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
+        let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
+        let extensions = partitioned_file.extensions.clone();
+        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
+            .global_counter("num_predicate_creation_errors");
+
+        let output_schema = Arc::new(
+            self.projection
+                .project_schema(self.table_schema.table_schema())?,
+        );
+
+        // Build literal column map for partition columns
+        let literal_columns: HashMap<String, ScalarValue> = self
+            .table_schema
+            .table_partition_cols()
+            .iter()
+            .zip(partitioned_file.partition_values.iter())
+            .map(|(field, value)| (field.name().clone(), value.clone()))
+            .collect();
+
+        Ok(Box::pin(async move {
+            // Replace partition column references with literal values
+            let mut projection = projection;
+            let mut predicate = predicate;
+            if !literal_columns.is_empty() {
+                projection = projection.try_map_exprs(|expr| {
+                    replace_columns_with_literals(Arc::clone(&expr), 
&literal_columns)
+                })?;
+                predicate = predicate
+                    .map(|p| replace_columns_with_literals(p, 
&literal_columns))
+                    .transpose()?;
+            }
+
+            // Load metadata
+            let options = ArrowReaderOptions::new();
+            let reader_metadata =
+                ArrowReaderMetadata::load_async(&mut async_file_reader, 
options.clone())
+                    .await?;
+
+            let mut physical_file_schema = 
Arc::clone(reader_metadata.schema());
+
+            // Apply schema coercions (e.g. Utf8 -> Utf8View)
+            if let Some(merged) = apply_file_schema_type_coercions(
+                &logical_file_schema,
+                &physical_file_schema,
+            ) {
+                physical_file_schema = Arc::new(merged);
+            }
+
+            let options = 
options.with_schema(Arc::clone(&physical_file_schema));
+            let reader_metadata = ArrowReaderMetadata::try_new(
+                Arc::clone(reader_metadata.metadata()),
+                options,
+            )?;
+
+            // Adapt predicate and projection to the physical file schema
+            let rewriter = expr_adapter_factory.create(
+                Arc::clone(&logical_file_schema),
+                Arc::clone(&physical_file_schema),
+            )?;
+            let predicate = predicate.map(|p| 
rewriter.rewrite(p)).transpose()?;
+            let projection = projection.try_map_exprs(|p| 
rewriter.rewrite(p))?;
+
+            // Build pruning predicates
+            let (pruning_predicate, _page_pruning_predicate) = 
build_pruning_predicates(
+                predicate.as_ref(),
+                &physical_file_schema,
+                &predicate_creation_errors,
+            );
+
+            // Determine which row groups to read
+            let file_metadata = Arc::clone(reader_metadata.metadata());
+            let rg_metadata = file_metadata.row_groups();
+            let num_row_groups = rg_metadata.len();
+
+            let access_plan = match extensions {
+                Some(ext) => ext
+                    .downcast_ref::<ParquetAccessPlan>()
+                    .cloned()
+                    .unwrap_or_else(|| 
ParquetAccessPlan::new_all(num_row_groups)),
+                None => ParquetAccessPlan::new_all(num_row_groups),
+            };
+
+            let mut row_groups_filter = 
RowGroupAccessPlanFilter::new(access_plan);
+
+            // Prune row groups by statistics
+            if let Some(ref pred) = pruning_predicate {
+                if enable_row_group_stats_pruning {
+                    row_groups_filter.prune_by_statistics(
+                        &physical_file_schema,
+                        reader_metadata.parquet_schema(),
+                        rg_metadata,
+                        pred,
+                        &file_metrics,
+                    );
+                }
+            }
+
+            let access_plan = row_groups_filter.build();
+            let row_group_indexes = access_plan.row_group_indexes();
+
+            // Build the push decoder
+            let mut builder =
+                
ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone())
+                    .with_batch_size(batch_size);
+
+            // Set row filter for predicate pushdown
+            if let Some(ref predicate) = predicate {
+                if pushdown_filters {
+                    let row_filter = row_filter::build_row_filter(
+                        predicate,
+                        &physical_file_schema,
+                        file_metadata.as_ref(),
+                        reorder_filters,
+                        &file_metrics,
+                    );
+                    match row_filter {
+                        Ok(Some(filter)) => {
+                            builder = builder.with_row_filter(filter);
+                        }
+                        Ok(None) => {}
+                        Err(e) => {
+                            log::debug!(
+                                "Ignoring error building row filter for 
'{predicate:?}': {e}"
+                            );
+                        }
+                    }
+                }
+            }
+
+            if !row_group_indexes.is_empty() {
+                builder = builder.with_row_groups(row_group_indexes);
+            } else {
+                // All row groups pruned - return empty stream
+                return Ok(futures::stream::empty().boxed());
+            }
+
+            // Set row selection from the access plan
+            let row_selection = 
access_plan.into_overall_row_selection(rg_metadata)?;
+            if let Some(selection) = row_selection {
+                builder = builder.with_row_selection(selection);
+            }
+
+            if let Some(limit) = limit {
+                builder = builder.with_limit(limit);
+            }
+
+            // Apply projection
+            let projection = projection.clone();
+            let indices = projection.column_indices();
+            let mask = ProjectionMask::roots(reader_metadata.parquet_schema(), 
indices);
+            builder = builder.with_projection(mask);
+
+            let decoder = builder.build()?;
+
+            // Create a stream that drives the decoder by fetching data as 
needed
+            let stream = futures::stream::unfold(
+                (decoder, async_file_reader, projection, output_schema),
+                |(mut decoder, mut reader, projection, output_schema)| async 
move {
+                    loop {
+                        match decoder.try_decode() {
+                            Ok(DecodeResult::NeedsData(ranges)) => {
+                                match 
reader.get_byte_ranges(ranges.clone()).await {

Review Comment:
   This I think is what you meant @alamb this way instead of `get_byte_ranges` 
here, we can push this to a IO morsel queue instead (and possibly do 
prefetching / more coalescing / split IO requests / etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to