This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new acd7106fa4 Minor: Split physical_plan/parquet/mod.rs into smaller
modules (#10727)
acd7106fa4 is described below
commit acd7106fa40fad58f50ae06227971c51073d8f48
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Jun 1 06:57:40 2024 -0400
Minor: Split physical_plan/parquet/mod.rs into smaller modules (#10727)
* Minor: Split physical_plan/parquet/mod.rs into smaller modules
* doc tweaks
* Add object store docs
* Apply suggestions from code review
Co-authored-by: Ruihang Xia <[email protected]>
---------
Co-authored-by: Ruihang Xia <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 376 ++-------------------
.../src/datasource/physical_plan/parquet/opener.rs | 204 +++++++++++
.../src/datasource/physical_plan/parquet/reader.rs | 140 ++++++++
.../datasource/physical_plan/parquet/row_groups.rs | 2 +-
.../src/datasource/physical_plan/parquet/writer.rs | 80 +++++
5 files changed, 445 insertions(+), 357 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index ac7c39bbdb..f0328098b4 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -15,66 +15,55 @@
// specific language governing permissions and limitations
// under the License.
-//! Execution plan for reading Parquet files
+//! [`ParquetExec`] Execution plan for reading Parquet files
use std::any::Any;
use std::fmt::Debug;
-use std::ops::Range;
use std::sync::Arc;
use crate::datasource::listing::PartitionedFile;
-use crate::datasource::physical_plan::file_stream::{
- FileOpenFuture, FileOpener, FileStream,
-};
+use crate::datasource::physical_plan::file_stream::FileStream;
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DisplayAs,
FileGroupPartitioner,
- FileMeta, FileScanConfig,
+ FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
- datasource::listing::ListingTableUrl,
- error::{DataFusionError, Result},
+ error::Result,
execution::context::TaskContext,
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
- DisplayFormatType, ExecutionMode, ExecutionPlan,
ExecutionPlanProperties,
- Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
+ DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties,
+ SendableRecordBatchStream, Statistics,
},
};
use arrow::datatypes::{DataType, SchemaRef};
-use arrow::error::ArrowError;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering,
PhysicalExpr};
-use bytes::Bytes;
-use futures::future::BoxFuture;
-use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
-use object_store::buffered::BufWriter;
-use object_store::path::Path;
-use object_store::ObjectStore;
-use parquet::arrow::arrow_reader::ArrowReaderOptions;
-use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
-use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
-use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties};
use parquet::schema::types::ColumnDescriptor;
-use tokio::task::JoinSet;
mod metrics;
+mod opener;
mod page_filter;
+mod reader;
mod row_filter;
mod row_groups;
mod statistics;
+mod writer;
-use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
pub use metrics::ParquetFileMetrics;
+use opener::ParquetOpener;
+pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
pub use statistics::{RequestedStatistics, StatisticsConverter};
+pub use writer::plan_to_parquet;
/// Execution plan for reading one or more Parquet files.
///
@@ -201,7 +190,7 @@ pub struct ParquetExec {
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
-/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`].
+/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
pub struct ParquetExecBuilder {
@@ -279,7 +268,9 @@ impl ParquetExecBuilder {
/// instance using individual I/O operations for the footer and each page.
///
/// If a custom `ParquetFileReaderFactory` is provided, then data access
- /// operations will be routed to this factory instead of `ObjectStore`.
+ /// operations will be routed to this factory instead of [`ObjectStore`].
+ ///
+ /// [`ObjectStore`]: object_store::ObjectStore
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
@@ -698,175 +689,6 @@ impl ExecutionPlan for ParquetExec {
}
}
-/// Implements [`FileOpener`] for a parquet file
-struct ParquetOpener {
- partition_index: usize,
- projection: Arc<[usize]>,
- batch_size: usize,
- limit: Option<usize>,
- predicate: Option<Arc<dyn PhysicalExpr>>,
- pruning_predicate: Option<Arc<PruningPredicate>>,
- page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
- table_schema: SchemaRef,
- metadata_size_hint: Option<usize>,
- metrics: ExecutionPlanMetricsSet,
- parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
- pushdown_filters: bool,
- reorder_filters: bool,
- enable_page_index: bool,
- enable_bloom_filter: bool,
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-}
-
-impl FileOpener for ParquetOpener {
- fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
- let file_range = file_meta.range.clone();
- let file_metrics = ParquetFileMetrics::new(
- self.partition_index,
- file_meta.location().as_ref(),
- &self.metrics,
- );
-
- let reader: Box<dyn AsyncFileReader> =
- self.parquet_file_reader_factory.create_reader(
- self.partition_index,
- file_meta,
- self.metadata_size_hint,
- &self.metrics,
- )?;
-
- let batch_size = self.batch_size;
- let projection = self.projection.clone();
- let projected_schema =
SchemaRef::from(self.table_schema.project(&projection)?);
- let schema_adapter =
self.schema_adapter_factory.create(projected_schema);
- let predicate = self.predicate.clone();
- let pruning_predicate = self.pruning_predicate.clone();
- let page_pruning_predicate = self.page_pruning_predicate.clone();
- let table_schema = self.table_schema.clone();
- let reorder_predicates = self.reorder_filters;
- let pushdown_filters = self.pushdown_filters;
- let enable_page_index = should_enable_page_index(
- self.enable_page_index,
- &self.page_pruning_predicate,
- );
- let enable_bloom_filter = self.enable_bloom_filter;
- let limit = self.limit;
-
- Ok(Box::pin(async move {
- let options =
ArrowReaderOptions::new().with_page_index(enable_page_index);
- let mut builder =
- ParquetRecordBatchStreamBuilder::new_with_options(reader,
options)
- .await?;
-
- let file_schema = builder.schema().clone();
-
- let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(&file_schema)?;
- // let predicate = predicate.map(|p| reassign_predicate_columns(p,
builder.schema(), true)).transpose()?;
-
- let mask = ProjectionMask::roots(
- builder.parquet_schema(),
- adapted_projections.iter().cloned(),
- );
-
- // Filter pushdown: evaluate predicates during scan
- if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
- let row_filter = row_filter::build_row_filter(
- &predicate,
- &file_schema,
- &table_schema,
- builder.metadata(),
- reorder_predicates,
- &file_metrics,
- );
-
- match row_filter {
- Ok(Some(filter)) => {
- builder = builder.with_row_filter(filter);
- }
- Ok(None) => {}
- Err(e) => {
- debug!(
- "Ignoring error building row filter for '{:?}':
{}",
- predicate, e
- );
- }
- };
- };
-
- // Determine which row groups to actually read. The idea is to skip
- // as many row groups as possible based on the metadata and query
- let file_metadata = builder.metadata().clone();
- let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
- let rg_metadata = file_metadata.row_groups();
- // track which row groups to actually read
- let mut row_groups = RowGroupSet::new(rg_metadata.len());
- // if there is a range restricting what parts of the file to read
- if let Some(range) = file_range.as_ref() {
- row_groups.prune_by_range(rg_metadata, range);
- }
- // If there is a predicate that can be evaluated against the
metadata
- if let Some(predicate) = predicate.as_ref() {
- row_groups.prune_by_statistics(
- &file_schema,
- builder.parquet_schema(),
- rg_metadata,
- predicate,
- &file_metrics,
- );
-
- if enable_bloom_filter && !row_groups.is_empty() {
- row_groups
- .prune_by_bloom_filters(
- &file_schema,
- &mut builder,
- predicate,
- &file_metrics,
- )
- .await;
- }
- }
-
- // page index pruning: if all data on individual pages can
- // be ruled using page metadata, rows from other columns
- // with that range can be skipped as well
- if enable_page_index && !row_groups.is_empty() {
- if let Some(p) = page_pruning_predicate {
- let pruned = p.prune(
- &file_schema,
- builder.parquet_schema(),
- &row_groups,
- file_metadata.as_ref(),
- &file_metrics,
- )?;
- if let Some(row_selection) = pruned {
- builder = builder.with_row_selection(row_selection);
- }
- }
- }
-
- if let Some(limit) = limit {
- builder = builder.with_limit(limit)
- }
-
- let stream = builder
- .with_projection(mask)
- .with_batch_size(batch_size)
- .with_row_groups(row_groups.indexes())
- .build()?;
-
- let adapted = stream
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))
- .map(move |maybe_batch| {
- maybe_batch
- .and_then(|b|
schema_mapping.map_batch(b).map_err(Into::into))
- });
-
- Ok(adapted.boxed())
- }))
- }
-}
-
fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
@@ -879,168 +701,6 @@ fn should_enable_page_index(
.unwrap_or(false)
}
-/// Interface for reading parquet files.
-///
-/// The combined implementations of [`ParquetFileReaderFactory`] and
-/// [`AsyncFileReader`] can be used to provide custom data access operations
-/// such as pre-cached data, I/O coalescing, etc.
-///
-/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
-pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
- /// Provides an `AsyncFileReader` for reading data from a parquet file
specified
- ///
- /// # Arguments
- /// * partition_index - Index of the partition (for reporting metrics)
- /// * file_meta - The file to be read
- /// * metadata_size_hint - If specified, the first IO reads this many
bytes from the footer
- /// * metrics - Execution metrics
- fn create_reader(
- &self,
- partition_index: usize,
- file_meta: FileMeta,
- metadata_size_hint: Option<usize>,
- metrics: &ExecutionPlanMetricsSet,
- ) -> Result<Box<dyn AsyncFileReader + Send>>;
-}
-
-/// Default implementation of [`ParquetFileReaderFactory`]
-///
-/// This implementation:
-/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
-/// 2. Reads the footer and page metadata on demand.
-/// 3. Does not cache metadata or coalesce I/O operations.
-#[derive(Debug)]
-pub struct DefaultParquetFileReaderFactory {
- store: Arc<dyn ObjectStore>,
-}
-
-impl DefaultParquetFileReaderFactory {
- /// Create a new `DefaultParquetFileReaderFactory`.
- pub fn new(store: Arc<dyn ObjectStore>) -> Self {
- Self { store }
- }
-}
-
-/// Implements [`AsyncFileReader`] for a parquet file in object storage.
-///
-/// This implementation uses the [`ParquetObjectReader`] to read data from the
-/// object store on demand, as required, tracking the number of bytes read.
-///
-/// This implementation does not coalesce I/O operations or cache bytes. Such
-/// optimizations can be done either at the object store level or by providing
a
-/// custom implementation of [`ParquetFileReaderFactory`].
-pub(crate) struct ParquetFileReader {
- file_metrics: ParquetFileMetrics,
- inner: ParquetObjectReader,
-}
-
-impl AsyncFileReader for ParquetFileReader {
- fn get_bytes(
- &mut self,
- range: Range<usize>,
- ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
- self.file_metrics.bytes_scanned.add(range.end - range.start);
- self.inner.get_bytes(range)
- }
-
- fn get_byte_ranges(
- &mut self,
- ranges: Vec<Range<usize>>,
- ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
- where
- Self: Send,
- {
- let total = ranges.iter().map(|r| r.end - r.start).sum();
- self.file_metrics.bytes_scanned.add(total);
- self.inner.get_byte_ranges(ranges)
- }
-
- fn get_metadata(
- &mut self,
- ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
- self.inner.get_metadata()
- }
-}
-
-impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
- fn create_reader(
- &self,
- partition_index: usize,
- file_meta: FileMeta,
- metadata_size_hint: Option<usize>,
- metrics: &ExecutionPlanMetricsSet,
- ) -> Result<Box<dyn AsyncFileReader + Send>> {
- let file_metrics = ParquetFileMetrics::new(
- partition_index,
- file_meta.location().as_ref(),
- metrics,
- );
- let store = Arc::clone(&self.store);
- let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
-
- if let Some(hint) = metadata_size_hint {
- inner = inner.with_footer_size_hint(hint)
- };
-
- Ok(Box::new(ParquetFileReader {
- inner,
- file_metrics,
- }))
- }
-}
-
-/// Executes a query and writes the results to a partitioned Parquet file.
-pub async fn plan_to_parquet(
- task_ctx: Arc<TaskContext>,
- plan: Arc<dyn ExecutionPlan>,
- path: impl AsRef<str>,
- writer_properties: Option<WriterProperties>,
-) -> Result<()> {
- let path = path.as_ref();
- let parsed = ListingTableUrl::parse(path)?;
- let object_store_url = parsed.object_store();
- let store = task_ctx.runtime_env().object_store(&object_store_url)?;
- let mut join_set = JoinSet::new();
- for i in 0..plan.output_partitioning().partition_count() {
- let plan: Arc<dyn ExecutionPlan> = plan.clone();
- let filename = format!("{}/part-{i}.parquet", parsed.prefix());
- let file = Path::parse(filename)?;
- let propclone = writer_properties.clone();
-
- let storeref = store.clone();
- let buf_writer = BufWriter::new(storeref, file.clone());
- let mut stream = plan.execute(i, task_ctx.clone())?;
- join_set.spawn(async move {
- let mut writer =
- AsyncArrowWriter::try_new(buf_writer, plan.schema(),
propclone)?;
- while let Some(next_batch) = stream.next().await {
- let batch = next_batch?;
- writer.write(&batch).await?;
- }
- writer
- .close()
- .await
- .map_err(DataFusionError::from)
- .map(|_| ())
- });
- }
-
- while let Some(result) = join_set.join_next().await {
- match result {
- Ok(res) => res?,
- Err(e) => {
- if e.is_panic() {
- std::panic::resume_unwind(e.into_panic());
- } else {
- unreachable!();
- }
- }
- }
- }
-
- Ok(())
-}
-
// Convert parquet column schema to arrow data type, and just consider the
// decimal data type.
pub(crate) fn parquet_to_arrow_decimal_type(
@@ -1098,9 +758,13 @@ mod tests {
use datafusion_physical_expr::create_physical_expr;
use chrono::{TimeZone, Utc};
+ use datafusion_physical_plan::ExecutionPlanProperties;
+ use futures::StreamExt;
use object_store::local::LocalFileSystem;
+ use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
+ use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
use url::Url;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
new file mode 100644
index 0000000000..3aec1e1d20
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetOpener`] for opening Parquet files
+
+use
crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
+use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
+use crate::datasource::physical_plan::parquet::{row_filter,
should_enable_page_index};
+use crate::datasource::physical_plan::{
+ FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics,
ParquetFileReaderFactory,
+};
+use crate::datasource::schema_adapter::SchemaAdapterFactory;
+use crate::physical_optimizer::pruning::PruningPredicate;
+use arrow_schema::{ArrowError, SchemaRef};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use futures::{StreamExt, TryStreamExt};
+use log::debug;
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use std::sync::Arc;
+
+/// Implements [`FileOpener`] for a parquet file
+pub(super) struct ParquetOpener {
+ pub partition_index: usize,
+ pub projection: Arc<[usize]>,
+ pub batch_size: usize,
+ pub limit: Option<usize>,
+ pub predicate: Option<Arc<dyn PhysicalExpr>>,
+ pub pruning_predicate: Option<Arc<PruningPredicate>>,
+ pub page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
+ pub table_schema: SchemaRef,
+ pub metadata_size_hint: Option<usize>,
+ pub metrics: ExecutionPlanMetricsSet,
+ pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
+ pub pushdown_filters: bool,
+ pub reorder_filters: bool,
+ pub enable_page_index: bool,
+ pub enable_bloom_filter: bool,
+ pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+}
+
+impl FileOpener for ParquetOpener {
+ fn open(&self, file_meta: FileMeta) ->
datafusion_common::Result<FileOpenFuture> {
+ let file_range = file_meta.range.clone();
+ let file_metrics = ParquetFileMetrics::new(
+ self.partition_index,
+ file_meta.location().as_ref(),
+ &self.metrics,
+ );
+
+ let reader: Box<dyn AsyncFileReader> =
+ self.parquet_file_reader_factory.create_reader(
+ self.partition_index,
+ file_meta,
+ self.metadata_size_hint,
+ &self.metrics,
+ )?;
+
+ let batch_size = self.batch_size;
+ let projection = self.projection.clone();
+ let projected_schema =
SchemaRef::from(self.table_schema.project(&projection)?);
+ let schema_adapter =
self.schema_adapter_factory.create(projected_schema);
+ let predicate = self.predicate.clone();
+ let pruning_predicate = self.pruning_predicate.clone();
+ let page_pruning_predicate = self.page_pruning_predicate.clone();
+ let table_schema = self.table_schema.clone();
+ let reorder_predicates = self.reorder_filters;
+ let pushdown_filters = self.pushdown_filters;
+ let enable_page_index = should_enable_page_index(
+ self.enable_page_index,
+ &self.page_pruning_predicate,
+ );
+ let enable_bloom_filter = self.enable_bloom_filter;
+ let limit = self.limit;
+
+ Ok(Box::pin(async move {
+ let options =
ArrowReaderOptions::new().with_page_index(enable_page_index);
+ let mut builder =
+ ParquetRecordBatchStreamBuilder::new_with_options(reader,
options)
+ .await?;
+
+ let file_schema = builder.schema().clone();
+
+ let (schema_mapping, adapted_projections) =
+ schema_adapter.map_schema(&file_schema)?;
+
+ let mask = ProjectionMask::roots(
+ builder.parquet_schema(),
+ adapted_projections.iter().cloned(),
+ );
+
+ // Filter pushdown: evaluate predicates during scan
+ if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
+ let row_filter = row_filter::build_row_filter(
+ &predicate,
+ &file_schema,
+ &table_schema,
+ builder.metadata(),
+ reorder_predicates,
+ &file_metrics,
+ );
+
+ match row_filter {
+ Ok(Some(filter)) => {
+ builder = builder.with_row_filter(filter);
+ }
+ Ok(None) => {}
+ Err(e) => {
+ debug!(
+ "Ignoring error building row filter for '{:?}':
{}",
+ predicate, e
+ );
+ }
+ };
+ };
+
+ // Determine which row groups to actually read. The idea is to skip
+ // as many row groups as possible based on the metadata and query
+ let file_metadata = builder.metadata().clone();
+ let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
+ let rg_metadata = file_metadata.row_groups();
+ // track which row groups to actually read
+ let mut row_groups = RowGroupSet::new(rg_metadata.len());
+ // if there is a range restricting what parts of the file to read
+ if let Some(range) = file_range.as_ref() {
+ row_groups.prune_by_range(rg_metadata, range);
+ }
+ // If there is a predicate that can be evaluated against the
metadata
+ if let Some(predicate) = predicate.as_ref() {
+ row_groups.prune_by_statistics(
+ &file_schema,
+ builder.parquet_schema(),
+ rg_metadata,
+ predicate,
+ &file_metrics,
+ );
+
+ if enable_bloom_filter && !row_groups.is_empty() {
+ row_groups
+ .prune_by_bloom_filters(
+ &file_schema,
+ &mut builder,
+ predicate,
+ &file_metrics,
+ )
+ .await;
+ }
+ }
+
+ // page index pruning: if all data on individual pages can
+ // be ruled using page metadata, rows from other columns
+ // with that range can be skipped as well
+ if enable_page_index && !row_groups.is_empty() {
+ if let Some(p) = page_pruning_predicate {
+ let pruned = p.prune(
+ &file_schema,
+ builder.parquet_schema(),
+ &row_groups,
+ file_metadata.as_ref(),
+ &file_metrics,
+ )?;
+ if let Some(row_selection) = pruned {
+ builder = builder.with_row_selection(row_selection);
+ }
+ }
+ }
+
+ if let Some(limit) = limit {
+ builder = builder.with_limit(limit)
+ }
+
+ let stream = builder
+ .with_projection(mask)
+ .with_batch_size(batch_size)
+ .with_row_groups(row_groups.indexes())
+ .build()?;
+
+ let adapted = stream
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ .map(move |maybe_batch| {
+ maybe_batch
+ .and_then(|b|
schema_mapping.map_batch(b).map_err(Into::into))
+ });
+
+ Ok(adapted.boxed())
+ }))
+ }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs
b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs
new file mode 100644
index 0000000000..265fb9d570
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
+//! creating parquet file readers
+
+use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
+use bytes::Bytes;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use futures::future::BoxFuture;
+use object_store::ObjectStore;
+use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+use parquet::file::metadata::ParquetMetaData;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Interface for reading parquet files.
+///
+/// The combined implementations of [`ParquetFileReaderFactory`] and
+/// [`AsyncFileReader`] can be used to provide custom data access operations
+/// such as pre-cached data, I/O coalescing, etc.
+///
+/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
+pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
+ /// Provides an `AsyncFileReader` for reading data from a parquet file
specified
+ ///
+ /// # Arguments
+ /// * partition_index - Index of the partition (for reporting metrics)
+ /// * file_meta - The file to be read
+ /// * metadata_size_hint - If specified, the first IO reads this many
bytes from the footer
+ /// * metrics - Execution metrics
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
+}
+
+/// Default implementation of [`ParquetFileReaderFactory`]
+///
+/// This implementation:
+/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
+/// 2. Reads the footer and page metadata on demand.
+/// 3. Does not cache metadata or coalesce I/O operations.
+#[derive(Debug)]
+pub struct DefaultParquetFileReaderFactory {
+ store: Arc<dyn ObjectStore>,
+}
+
+impl DefaultParquetFileReaderFactory {
+ /// Create a new `DefaultParquetFileReaderFactory`.
+ pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+ Self { store }
+ }
+}
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage.
+///
+/// This implementation uses the [`ParquetObjectReader`] to read data from the
+/// object store on demand, as required, tracking the number of bytes read.
+///
+/// This implementation does not coalesce I/O operations or cache bytes. Such
+/// optimizations can be done either at the object store level or by providing
a
+/// custom implementation of [`ParquetFileReaderFactory`].
+pub(crate) struct ParquetFileReader {
+ pub file_metrics: ParquetFileMetrics,
+ pub inner: ParquetObjectReader,
+}
+
+impl AsyncFileReader for ParquetFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+ self.file_metrics.bytes_scanned.add(range.end - range.start);
+ self.inner.get_bytes(range)
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<usize>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ let total = ranges.iter().map(|r| r.end - r.start).sum();
+ self.file_metrics.bytes_scanned.add(total);
+ self.inner.get_byte_ranges(ranges)
+ }
+
+ fn get_metadata(
+ &mut self,
+ ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ self.inner.get_metadata()
+ }
+}
+
+impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
+ let file_metrics = ParquetFileMetrics::new(
+ partition_index,
+ file_meta.location().as_ref(),
+ metrics,
+ );
+ let store = Arc::clone(&self.store);
+ let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
+
+ if let Some(hint) = metadata_size_hint {
+ inner = inner.with_footer_size_hint(hint)
+ };
+
+ Ok(Box::new(ParquetFileReader {
+ inner,
+ file_metrics,
+ }))
+ }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 0a0ca4369d..7dd91d3d4e 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -417,7 +417,7 @@ impl<'a> PruningStatistics for
RowGroupPruningStatistics<'a> {
#[cfg(test)]
mod tests {
use super::*;
- use crate::datasource::physical_plan::parquet::ParquetFileReader;
+ use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::{DataType, Field};
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs
b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs
new file mode 100644
index 0000000000..0c0c546910
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::listing::ListingTableUrl;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
+use futures::StreamExt;
+use object_store::buffered::BufWriter;
+use object_store::path::Path;
+use parquet::arrow::AsyncArrowWriter;
+use parquet::file::properties::WriterProperties;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+ task_ctx: Arc<TaskContext>,
+ plan: Arc<dyn ExecutionPlan>,
+ path: impl AsRef<str>,
+ writer_properties: Option<WriterProperties>,
+) -> datafusion_common::Result<()> {
+ let path = path.as_ref();
+ let parsed = ListingTableUrl::parse(path)?;
+ let object_store_url = parsed.object_store();
+ let store = task_ctx.runtime_env().object_store(&object_store_url)?;
+ let mut join_set = JoinSet::new();
+ for i in 0..plan.output_partitioning().partition_count() {
+ let plan: Arc<dyn ExecutionPlan> = plan.clone();
+ let filename = format!("{}/part-{i}.parquet", parsed.prefix());
+ let file = Path::parse(filename)?;
+ let propclone = writer_properties.clone();
+
+ let storeref = store.clone();
+ let buf_writer = BufWriter::new(storeref, file.clone());
+ let mut stream = plan.execute(i, task_ctx.clone())?;
+ join_set.spawn(async move {
+ let mut writer =
+ AsyncArrowWriter::try_new(buf_writer, plan.schema(),
propclone)?;
+ while let Some(next_batch) = stream.next().await {
+ let batch = next_batch?;
+ writer.write(&batch).await?;
+ }
+ writer
+ .close()
+ .await
+ .map_err(DataFusionError::from)
+ .map(|_| ())
+ });
+ }
+
+ while let Some(result) = join_set.join_next().await {
+ match result {
+ Ok(res) => res?,
+ Err(e) => {
+ if e.is_panic() {
+ std::panic::resume_unwind(e.into_panic());
+ } else {
+ unreachable!();
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]