This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new acd7106fa4 Minor: Split physical_plan/parquet/mod.rs into smaller 
modules (#10727)
acd7106fa4 is described below

commit acd7106fa40fad58f50ae06227971c51073d8f48
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Jun 1 06:57:40 2024 -0400

    Minor: Split physical_plan/parquet/mod.rs into smaller modules (#10727)
    
    * Minor: Split physical_plan/parquet/mod.rs into smaller modules
    
    * doc tweaks
    
    * Add object store docs
    
    * Apply suggestions from code review
    
    Co-authored-by: Ruihang Xia <[email protected]>
    
    ---------
    
    Co-authored-by: Ruihang Xia <[email protected]>
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 376 ++-------------------
 .../src/datasource/physical_plan/parquet/opener.rs | 204 +++++++++++
 .../src/datasource/physical_plan/parquet/reader.rs | 140 ++++++++
 .../datasource/physical_plan/parquet/row_groups.rs |   2 +-
 .../src/datasource/physical_plan/parquet/writer.rs |  80 +++++
 5 files changed, 445 insertions(+), 357 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ac7c39bbdb..f0328098b4 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -15,66 +15,55 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Execution plan for reading Parquet files
+//! [`ParquetExec`] Execution plan for reading Parquet files
 
 use std::any::Any;
 use std::fmt::Debug;
-use std::ops::Range;
 use std::sync::Arc;
 
 use crate::datasource::listing::PartitionedFile;
-use crate::datasource::physical_plan::file_stream::{
-    FileOpenFuture, FileOpener, FileStream,
-};
+use crate::datasource::physical_plan::file_stream::FileStream;
 use crate::datasource::physical_plan::{
     parquet::page_filter::PagePruningPredicate, DisplayAs, 
FileGroupPartitioner,
-    FileMeta, FileScanConfig,
+    FileScanConfig,
 };
 use crate::{
     config::{ConfigOptions, TableParquetOptions},
-    datasource::listing::ListingTableUrl,
-    error::{DataFusionError, Result},
+    error::Result,
     execution::context::TaskContext,
     physical_optimizer::pruning::PruningPredicate,
     physical_plan::{
         metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
-        DisplayFormatType, ExecutionMode, ExecutionPlan, 
ExecutionPlanProperties,
-        Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+        DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, 
PlanProperties,
+        SendableRecordBatchStream, Statistics,
     },
 };
 
 use arrow::datatypes::{DataType, SchemaRef};
-use arrow::error::ArrowError;
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, 
PhysicalExpr};
 
-use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use log::debug;
-use object_store::buffered::BufWriter;
-use object_store::path::Path;
-use object_store::ObjectStore;
-use parquet::arrow::arrow_reader::ArrowReaderOptions;
-use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
-use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, 
ProjectionMask};
 use parquet::basic::{ConvertedType, LogicalType};
-use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
 use parquet::schema::types::ColumnDescriptor;
-use tokio::task::JoinSet;
 
 mod metrics;
+mod opener;
 mod page_filter;
+mod reader;
 mod row_filter;
 mod row_groups;
 mod statistics;
+mod writer;
 
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
 use crate::datasource::schema_adapter::{
     DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
 pub use metrics::ParquetFileMetrics;
+use opener::ParquetOpener;
+pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
 pub use statistics::{RequestedStatistics, StatisticsConverter};
+pub use writer::plan_to_parquet;
 
 /// Execution plan for reading one or more Parquet files.
 ///
@@ -201,7 +190,7 @@ pub struct ParquetExec {
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
-/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
+/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
 ///
 /// See example on [`ParquetExec`].
 pub struct ParquetExecBuilder {
@@ -279,7 +268,9 @@ impl ParquetExecBuilder {
     /// instance using individual I/O operations for the footer and each page.
     ///
     /// If a custom `ParquetFileReaderFactory` is provided, then data access
-    /// operations will be routed to this factory instead of `ObjectStore`.
+    /// operations will be routed to this factory instead of [`ObjectStore`].
+    ///
+    /// [`ObjectStore`]: object_store::ObjectStore
     pub fn with_parquet_file_reader_factory(
         mut self,
         parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -698,175 +689,6 @@ impl ExecutionPlan for ParquetExec {
     }
 }
 
-/// Implements [`FileOpener`] for a parquet file
-struct ParquetOpener {
-    partition_index: usize,
-    projection: Arc<[usize]>,
-    batch_size: usize,
-    limit: Option<usize>,
-    predicate: Option<Arc<dyn PhysicalExpr>>,
-    pruning_predicate: Option<Arc<PruningPredicate>>,
-    page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
-    table_schema: SchemaRef,
-    metadata_size_hint: Option<usize>,
-    metrics: ExecutionPlanMetricsSet,
-    parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
-    pushdown_filters: bool,
-    reorder_filters: bool,
-    enable_page_index: bool,
-    enable_bloom_filter: bool,
-    schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-}
-
-impl FileOpener for ParquetOpener {
-    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
-        let file_range = file_meta.range.clone();
-        let file_metrics = ParquetFileMetrics::new(
-            self.partition_index,
-            file_meta.location().as_ref(),
-            &self.metrics,
-        );
-
-        let reader: Box<dyn AsyncFileReader> =
-            self.parquet_file_reader_factory.create_reader(
-                self.partition_index,
-                file_meta,
-                self.metadata_size_hint,
-                &self.metrics,
-            )?;
-
-        let batch_size = self.batch_size;
-        let projection = self.projection.clone();
-        let projected_schema = 
SchemaRef::from(self.table_schema.project(&projection)?);
-        let schema_adapter = 
self.schema_adapter_factory.create(projected_schema);
-        let predicate = self.predicate.clone();
-        let pruning_predicate = self.pruning_predicate.clone();
-        let page_pruning_predicate = self.page_pruning_predicate.clone();
-        let table_schema = self.table_schema.clone();
-        let reorder_predicates = self.reorder_filters;
-        let pushdown_filters = self.pushdown_filters;
-        let enable_page_index = should_enable_page_index(
-            self.enable_page_index,
-            &self.page_pruning_predicate,
-        );
-        let enable_bloom_filter = self.enable_bloom_filter;
-        let limit = self.limit;
-
-        Ok(Box::pin(async move {
-            let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
-            let mut builder =
-                ParquetRecordBatchStreamBuilder::new_with_options(reader, 
options)
-                    .await?;
-
-            let file_schema = builder.schema().clone();
-
-            let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(&file_schema)?;
-            // let predicate = predicate.map(|p| reassign_predicate_columns(p, 
builder.schema(), true)).transpose()?;
-
-            let mask = ProjectionMask::roots(
-                builder.parquet_schema(),
-                adapted_projections.iter().cloned(),
-            );
-
-            // Filter pushdown: evaluate predicates during scan
-            if let Some(predicate) = 
pushdown_filters.then_some(predicate).flatten() {
-                let row_filter = row_filter::build_row_filter(
-                    &predicate,
-                    &file_schema,
-                    &table_schema,
-                    builder.metadata(),
-                    reorder_predicates,
-                    &file_metrics,
-                );
-
-                match row_filter {
-                    Ok(Some(filter)) => {
-                        builder = builder.with_row_filter(filter);
-                    }
-                    Ok(None) => {}
-                    Err(e) => {
-                        debug!(
-                            "Ignoring error building row filter for '{:?}': 
{}",
-                            predicate, e
-                        );
-                    }
-                };
-            };
-
-            // Determine which row groups to actually read. The idea is to skip
-            // as many row groups as possible based on the metadata and query
-            let file_metadata = builder.metadata().clone();
-            let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
-            let rg_metadata = file_metadata.row_groups();
-            // track which row groups to actually read
-            let mut row_groups = RowGroupSet::new(rg_metadata.len());
-            // if there is a range restricting what parts of the file to read
-            if let Some(range) = file_range.as_ref() {
-                row_groups.prune_by_range(rg_metadata, range);
-            }
-            // If there is a predicate that can be evaluated against the 
metadata
-            if let Some(predicate) = predicate.as_ref() {
-                row_groups.prune_by_statistics(
-                    &file_schema,
-                    builder.parquet_schema(),
-                    rg_metadata,
-                    predicate,
-                    &file_metrics,
-                );
-
-                if enable_bloom_filter && !row_groups.is_empty() {
-                    row_groups
-                        .prune_by_bloom_filters(
-                            &file_schema,
-                            &mut builder,
-                            predicate,
-                            &file_metrics,
-                        )
-                        .await;
-                }
-            }
-
-            // page index pruning: if all data on individual pages can
-            // be ruled using page metadata, rows from other columns
-            // with that range can be skipped as well
-            if enable_page_index && !row_groups.is_empty() {
-                if let Some(p) = page_pruning_predicate {
-                    let pruned = p.prune(
-                        &file_schema,
-                        builder.parquet_schema(),
-                        &row_groups,
-                        file_metadata.as_ref(),
-                        &file_metrics,
-                    )?;
-                    if let Some(row_selection) = pruned {
-                        builder = builder.with_row_selection(row_selection);
-                    }
-                }
-            }
-
-            if let Some(limit) = limit {
-                builder = builder.with_limit(limit)
-            }
-
-            let stream = builder
-                .with_projection(mask)
-                .with_batch_size(batch_size)
-                .with_row_groups(row_groups.indexes())
-                .build()?;
-
-            let adapted = stream
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                .map(move |maybe_batch| {
-                    maybe_batch
-                        .and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
-                });
-
-            Ok(adapted.boxed())
-        }))
-    }
-}
-
 fn should_enable_page_index(
     enable_page_index: bool,
     page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
@@ -879,168 +701,6 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
-/// Interface for reading parquet files.
-///
-/// The combined implementations of [`ParquetFileReaderFactory`] and
-/// [`AsyncFileReader`] can be used to provide custom data access operations
-/// such as pre-cached data, I/O coalescing, etc.
-///
-/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
-pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
-    /// Provides an `AsyncFileReader` for reading data from a parquet file 
specified
-    ///
-    /// # Arguments
-    /// * partition_index - Index of the partition (for reporting metrics)
-    /// * file_meta - The file to be read
-    /// * metadata_size_hint - If specified, the first IO reads this many 
bytes from the footer
-    /// * metrics - Execution metrics
-    fn create_reader(
-        &self,
-        partition_index: usize,
-        file_meta: FileMeta,
-        metadata_size_hint: Option<usize>,
-        metrics: &ExecutionPlanMetricsSet,
-    ) -> Result<Box<dyn AsyncFileReader + Send>>;
-}
-
-/// Default implementation of [`ParquetFileReaderFactory`]
-///
-/// This implementation:
-/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
-/// 2. Reads the footer and page metadata on demand.
-/// 3. Does not cache metadata or coalesce I/O operations.
-#[derive(Debug)]
-pub struct DefaultParquetFileReaderFactory {
-    store: Arc<dyn ObjectStore>,
-}
-
-impl DefaultParquetFileReaderFactory {
-    /// Create a new `DefaultParquetFileReaderFactory`.
-    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
-        Self { store }
-    }
-}
-
-/// Implements [`AsyncFileReader`] for a parquet file in object storage.
-///
-/// This implementation uses the [`ParquetObjectReader`] to read data from the
-/// object store on demand, as required, tracking the number of bytes read.
-///
-/// This implementation does not coalesce I/O operations or cache bytes. Such
-/// optimizations can be done either at the object store level or by providing 
a
-/// custom implementation of [`ParquetFileReaderFactory`].
-pub(crate) struct ParquetFileReader {
-    file_metrics: ParquetFileMetrics,
-    inner: ParquetObjectReader,
-}
-
-impl AsyncFileReader for ParquetFileReader {
-    fn get_bytes(
-        &mut self,
-        range: Range<usize>,
-    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
-        self.file_metrics.bytes_scanned.add(range.end - range.start);
-        self.inner.get_bytes(range)
-    }
-
-    fn get_byte_ranges(
-        &mut self,
-        ranges: Vec<Range<usize>>,
-    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
-    where
-        Self: Send,
-    {
-        let total = ranges.iter().map(|r| r.end - r.start).sum();
-        self.file_metrics.bytes_scanned.add(total);
-        self.inner.get_byte_ranges(ranges)
-    }
-
-    fn get_metadata(
-        &mut self,
-    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
-        self.inner.get_metadata()
-    }
-}
-
-impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
-    fn create_reader(
-        &self,
-        partition_index: usize,
-        file_meta: FileMeta,
-        metadata_size_hint: Option<usize>,
-        metrics: &ExecutionPlanMetricsSet,
-    ) -> Result<Box<dyn AsyncFileReader + Send>> {
-        let file_metrics = ParquetFileMetrics::new(
-            partition_index,
-            file_meta.location().as_ref(),
-            metrics,
-        );
-        let store = Arc::clone(&self.store);
-        let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
-
-        if let Some(hint) = metadata_size_hint {
-            inner = inner.with_footer_size_hint(hint)
-        };
-
-        Ok(Box::new(ParquetFileReader {
-            inner,
-            file_metrics,
-        }))
-    }
-}
-
-/// Executes a query and writes the results to a partitioned Parquet file.
-pub async fn plan_to_parquet(
-    task_ctx: Arc<TaskContext>,
-    plan: Arc<dyn ExecutionPlan>,
-    path: impl AsRef<str>,
-    writer_properties: Option<WriterProperties>,
-) -> Result<()> {
-    let path = path.as_ref();
-    let parsed = ListingTableUrl::parse(path)?;
-    let object_store_url = parsed.object_store();
-    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
-    let mut join_set = JoinSet::new();
-    for i in 0..plan.output_partitioning().partition_count() {
-        let plan: Arc<dyn ExecutionPlan> = plan.clone();
-        let filename = format!("{}/part-{i}.parquet", parsed.prefix());
-        let file = Path::parse(filename)?;
-        let propclone = writer_properties.clone();
-
-        let storeref = store.clone();
-        let buf_writer = BufWriter::new(storeref, file.clone());
-        let mut stream = plan.execute(i, task_ctx.clone())?;
-        join_set.spawn(async move {
-            let mut writer =
-                AsyncArrowWriter::try_new(buf_writer, plan.schema(), 
propclone)?;
-            while let Some(next_batch) = stream.next().await {
-                let batch = next_batch?;
-                writer.write(&batch).await?;
-            }
-            writer
-                .close()
-                .await
-                .map_err(DataFusionError::from)
-                .map(|_| ())
-        });
-    }
-
-    while let Some(result) = join_set.join_next().await {
-        match result {
-            Ok(res) => res?,
-            Err(e) => {
-                if e.is_panic() {
-                    std::panic::resume_unwind(e.into_panic());
-                } else {
-                    unreachable!();
-                }
-            }
-        }
-    }
-
-    Ok(())
-}
-
 // Convert parquet column schema to arrow data type, and just consider the
 // decimal data type.
 pub(crate) fn parquet_to_arrow_decimal_type(
@@ -1098,9 +758,13 @@ mod tests {
     use datafusion_physical_expr::create_physical_expr;
 
     use chrono::{TimeZone, Utc};
+    use datafusion_physical_plan::ExecutionPlanProperties;
+    use futures::StreamExt;
     use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
     use object_store::ObjectMeta;
     use parquet::arrow::ArrowWriter;
+    use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
     use url::Url;
 
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
new file mode 100644
index 0000000000..3aec1e1d20
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetOpener`] for opening Parquet files
+
+use 
crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
+use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
+use crate::datasource::physical_plan::parquet::{row_filter, 
should_enable_page_index};
+use crate::datasource::physical_plan::{
+    FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, 
ParquetFileReaderFactory,
+};
+use crate::datasource::schema_adapter::SchemaAdapterFactory;
+use crate::physical_optimizer::pruning::PruningPredicate;
+use arrow_schema::{ArrowError, SchemaRef};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use futures::{StreamExt, TryStreamExt};
+use log::debug;
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use std::sync::Arc;
+
+/// Implements [`FileOpener`] for a parquet file
+pub(super) struct ParquetOpener {
+    pub partition_index: usize,
+    pub projection: Arc<[usize]>,
+    pub batch_size: usize,
+    pub limit: Option<usize>,
+    pub predicate: Option<Arc<dyn PhysicalExpr>>,
+    pub pruning_predicate: Option<Arc<PruningPredicate>>,
+    pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
+    pub table_schema: SchemaRef,
+    pub metadata_size_hint: Option<usize>,
+    pub metrics: ExecutionPlanMetricsSet,
+    pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+    pub pushdown_filters: bool,
+    pub reorder_filters: bool,
+    pub enable_page_index: bool,
+    pub enable_bloom_filter: bool,
+    pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+}
+
+impl FileOpener for ParquetOpener {
+    fn open(&self, file_meta: FileMeta) -> 
datafusion_common::Result<FileOpenFuture> {
+        let file_range = file_meta.range.clone();
+        let file_metrics = ParquetFileMetrics::new(
+            self.partition_index,
+            file_meta.location().as_ref(),
+            &self.metrics,
+        );
+
+        let reader: Box<dyn AsyncFileReader> =
+            self.parquet_file_reader_factory.create_reader(
+                self.partition_index,
+                file_meta,
+                self.metadata_size_hint,
+                &self.metrics,
+            )?;
+
+        let batch_size = self.batch_size;
+        let projection = self.projection.clone();
+        let projected_schema = 
SchemaRef::from(self.table_schema.project(&projection)?);
+        let schema_adapter = 
self.schema_adapter_factory.create(projected_schema);
+        let predicate = self.predicate.clone();
+        let pruning_predicate = self.pruning_predicate.clone();
+        let page_pruning_predicate = self.page_pruning_predicate.clone();
+        let table_schema = self.table_schema.clone();
+        let reorder_predicates = self.reorder_filters;
+        let pushdown_filters = self.pushdown_filters;
+        let enable_page_index = should_enable_page_index(
+            self.enable_page_index,
+            &self.page_pruning_predicate,
+        );
+        let enable_bloom_filter = self.enable_bloom_filter;
+        let limit = self.limit;
+
+        Ok(Box::pin(async move {
+            let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
+            let mut builder =
+                ParquetRecordBatchStreamBuilder::new_with_options(reader, 
options)
+                    .await?;
+
+            let file_schema = builder.schema().clone();
+
+            let (schema_mapping, adapted_projections) =
+                schema_adapter.map_schema(&file_schema)?;
+
+            let mask = ProjectionMask::roots(
+                builder.parquet_schema(),
+                adapted_projections.iter().cloned(),
+            );
+
+            // Filter pushdown: evaluate predicates during scan
+            if let Some(predicate) = 
pushdown_filters.then_some(predicate).flatten() {
+                let row_filter = row_filter::build_row_filter(
+                    &predicate,
+                    &file_schema,
+                    &table_schema,
+                    builder.metadata(),
+                    reorder_predicates,
+                    &file_metrics,
+                );
+
+                match row_filter {
+                    Ok(Some(filter)) => {
+                        builder = builder.with_row_filter(filter);
+                    }
+                    Ok(None) => {}
+                    Err(e) => {
+                        debug!(
+                            "Ignoring error building row filter for '{:?}': 
{}",
+                            predicate, e
+                        );
+                    }
+                };
+            };
+
+            // Determine which row groups to actually read. The idea is to skip
+            // as many row groups as possible based on the metadata and query
+            let file_metadata = builder.metadata().clone();
+            let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
+            let rg_metadata = file_metadata.row_groups();
+            // track which row groups to actually read
+            let mut row_groups = RowGroupSet::new(rg_metadata.len());
+            // if there is a range restricting what parts of the file to read
+            if let Some(range) = file_range.as_ref() {
+                row_groups.prune_by_range(rg_metadata, range);
+            }
+            // If there is a predicate that can be evaluated against the 
metadata
+            if let Some(predicate) = predicate.as_ref() {
+                row_groups.prune_by_statistics(
+                    &file_schema,
+                    builder.parquet_schema(),
+                    rg_metadata,
+                    predicate,
+                    &file_metrics,
+                );
+
+                if enable_bloom_filter && !row_groups.is_empty() {
+                    row_groups
+                        .prune_by_bloom_filters(
+                            &file_schema,
+                            &mut builder,
+                            predicate,
+                            &file_metrics,
+                        )
+                        .await;
+                }
+            }
+
+            // page index pruning: if all data on individual pages can
+            // be ruled using page metadata, rows from other columns
+            // with that range can be skipped as well
+            if enable_page_index && !row_groups.is_empty() {
+                if let Some(p) = page_pruning_predicate {
+                    let pruned = p.prune(
+                        &file_schema,
+                        builder.parquet_schema(),
+                        &row_groups,
+                        file_metadata.as_ref(),
+                        &file_metrics,
+                    )?;
+                    if let Some(row_selection) = pruned {
+                        builder = builder.with_row_selection(row_selection);
+                    }
+                }
+            }
+
+            if let Some(limit) = limit {
+                builder = builder.with_limit(limit)
+            }
+
+            let stream = builder
+                .with_projection(mask)
+                .with_batch_size(batch_size)
+                .with_row_groups(row_groups.indexes())
+                .build()?;
+
+            let adapted = stream
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                .map(move |maybe_batch| {
+                    maybe_batch
+                        .and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
+                });
+
+            Ok(adapted.boxed())
+        }))
+    }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs
new file mode 100644
index 0000000000..265fb9d570
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
+//! creating parquet file readers
+
+use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
+use bytes::Bytes;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use futures::future::BoxFuture;
+use object_store::ObjectStore;
+use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+use parquet::file::metadata::ParquetMetaData;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Interface for reading parquet files.
+///
+/// The combined implementations of [`ParquetFileReaderFactory`] and
+/// [`AsyncFileReader`] can be used to provide custom data access operations
+/// such as pre-cached data, I/O coalescing, etc.
+///
+/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
+pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
+    /// Provides an `AsyncFileReader` for reading data from a parquet file 
specified
+    ///
+    /// # Arguments
+    /// * partition_index - Index of the partition (for reporting metrics)
+    /// * file_meta - The file to be read
+    /// * metadata_size_hint - If specified, the first IO reads this many 
bytes from the footer
+    /// * metrics - Execution metrics
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
+}
+
+/// Default implementation of [`ParquetFileReaderFactory`]
+///
+/// This implementation:
+/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
+/// 2. Reads the footer and page metadata on demand.
+/// 3. Does not cache metadata or coalesce I/O operations.
+#[derive(Debug)]
+pub struct DefaultParquetFileReaderFactory {
+    store: Arc<dyn ObjectStore>,
+}
+
+impl DefaultParquetFileReaderFactory {
+    /// Create a new `DefaultParquetFileReaderFactory`.
+    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+        Self { store }
+    }
+}
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage.
+///
+/// This implementation uses the [`ParquetObjectReader`] to read data from the
+/// object store on demand, as required, tracking the number of bytes read.
+///
+/// This implementation does not coalesce I/O operations or cache bytes. Such
+/// optimizations can be done either at the object store level or by providing 
a
+/// custom implementation of [`ParquetFileReaderFactory`].
+pub(crate) struct ParquetFileReader {
+    pub file_metrics: ParquetFileMetrics,
+    pub inner: ParquetObjectReader,
+}
+
+impl AsyncFileReader for ParquetFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+        self.file_metrics.bytes_scanned.add(range.end - range.start);
+        self.inner.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<usize>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        let total = ranges.iter().map(|r| r.end - r.start).sum();
+        self.file_metrics.bytes_scanned.add(total);
+        self.inner.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata(
+        &mut self,
+    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        self.inner.get_metadata()
+    }
+}
+
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
+        let file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            file_meta.location().as_ref(),
+            metrics,
+        );
+        let store = Arc::clone(&self.store);
+        let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
+
+        if let Some(hint) = metadata_size_hint {
+            inner = inner.with_footer_size_hint(hint)
+        };
+
+        Ok(Box::new(ParquetFileReader {
+            inner,
+            file_metrics,
+        }))
+    }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0a0ca4369d..7dd91d3d4e 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -417,7 +417,7 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::datasource::physical_plan::parquet::ParquetFileReader;
+    use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
     use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
     use arrow::datatypes::DataType::Decimal128;
     use arrow::datatypes::{DataType, Field};
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs
new file mode 100644
index 0000000000..0c0c546910
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::listing::ListingTableUrl;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use futures::StreamExt;
+use object_store::buffered::BufWriter;
+use object_store::path::Path;
+use parquet::arrow::AsyncArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+    task_ctx: Arc<TaskContext>,
+    plan: Arc<dyn ExecutionPlan>,
+    path: impl AsRef<str>,
+    writer_properties: Option<WriterProperties>,
+) -> datafusion_common::Result<()> {
+    let path = path.as_ref();
+    let parsed = ListingTableUrl::parse(path)?;
+    let object_store_url = parsed.object_store();
+    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
+    let mut join_set = JoinSet::new();
+    for i in 0..plan.output_partitioning().partition_count() {
+        let plan: Arc<dyn ExecutionPlan> = plan.clone();
+        let filename = format!("{}/part-{i}.parquet", parsed.prefix());
+        let file = Path::parse(filename)?;
+        let propclone = writer_properties.clone();
+
+        let storeref = store.clone();
+        let buf_writer = BufWriter::new(storeref, file.clone());
+        let mut stream = plan.execute(i, task_ctx.clone())?;
+        join_set.spawn(async move {
+            let mut writer =
+                AsyncArrowWriter::try_new(buf_writer, plan.schema(), 
propclone)?;
+            while let Some(next_batch) = stream.next().await {
+                let batch = next_batch?;
+                writer.write(&batch).await?;
+            }
+            writer
+                .close()
+                .await
+                .map_err(DataFusionError::from)
+                .map(|_| ())
+        });
+    }
+
+    while let Some(result) = join_set.join_next().await {
+        match result {
+            Ok(res) => res?,
+            Err(e) => {
+                if e.is_panic() {
+                    std::panic::resume_unwind(e.into_panic());
+                } else {
+                    unreachable!();
+                }
+            }
+        }
+    }
+
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to