This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 8867501dc chore: fix allocations in schema adapter for
`native_datafusion` scan (#3755)
8867501dc is described below
commit 8867501dc42141a4fb7933412078ba794c8a2ea6
Author: Oleks V <[email protected]>
AuthorDate: Mon Mar 23 08:47:57 2026 -0700
chore: fix allocations in schema adapter for `native_datafusion` scan
(#3755)
---
native/core/src/parquet/mod.rs | 1 +
native/core/src/parquet/parquet_exec.rs | 6 +
.../src/parquet/parquet_read_cached_factory.rs | 149 +++++++++++++++++++++
native/core/src/parquet/schema_adapter.rs | 24 ++--
4 files changed, 166 insertions(+), 14 deletions(-)
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index d24a6a503..a59a349bf 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -23,6 +23,7 @@ pub use mutable_vector::*;
#[macro_use]
pub mod util;
pub mod parquet_exec;
+pub mod parquet_read_cached_factory;
pub mod parquet_support;
pub mod read;
pub mod schema_adapter;
diff --git a/native/core/src/parquet/parquet_exec.rs
b/native/core/src/parquet/parquet_exec.rs
index 2d970734b..ef4c878b9 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -17,6 +17,7 @@
use crate::execution::operators::ExecutionError;
use crate::parquet::encryption_support::{CometEncryptionConfig,
ENCRYPTION_FACTORY_ID};
+use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
use arrow::datatypes::{Field, SchemaRef};
@@ -149,6 +150,11 @@ pub(crate) fn init_datasource_exec(
);
}
+ // Use caching reader factory to avoid redundant footer reads across
partitions
+ let store = session_ctx.runtime_env().object_store(&object_store_url)?;
+ parquet_source = parquet_source
+
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store)));
+
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
SparkPhysicalExprAdapterFactory::new(spark_parquet_options,
default_values),
);
diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs
b/native/core/src/parquet/parquet_read_cached_factory.rs
new file mode 100644
index 000000000..f90e53411
--- /dev/null
+++ b/native/core/src/parquet/parquet_read_cached_factory.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A `ParquetFileReaderFactory` that caches parquet footer metadata across
+//! partitions within a single scan. When multiple Spark partitions read from
+//! the same parquet file (different row group ranges), this avoids redundant
+//! footer reads and parsing.
+//!
+//! The cache is scoped to the factory instance (one per scan), not global,
+//! so it does not persist across queries.
+//!
+//! Uses `tokio::sync::OnceCell` per file path so that concurrent partitions
+//! wait for the first reader to load the footer rather than all racing.
+
+use bytes::Bytes;
+use datafusion::common::Result as DataFusionResult;
+use datafusion::datasource::physical_plan::parquet::ParquetFileReaderFactory;
+use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet,
MetricBuilder};
+use datafusion_datasource::PartitionedFile;
+use futures::future::BoxFuture;
+use futures::FutureExt;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+use tokio::sync::OnceCell;
+
+type MetadataCell = Arc<OnceCell<Arc<ParquetMetaData>>>;
+
+/// A `ParquetFileReaderFactory` that caches footer metadata by file path.
+/// The cache is scoped to this factory instance (shared across partitions
+/// within a single scan via Arc), not global.
+#[derive(Debug)]
+pub struct CachingParquetReaderFactory {
+ store: Arc<dyn ObjectStore>,
+ cache: Arc<Mutex<HashMap<Path, MetadataCell>>>,
+}
+
+impl CachingParquetReaderFactory {
+ pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+ Self {
+ store,
+ cache: Arc::new(Mutex::new(HashMap::new())),
+ }
+ }
+}
+
+impl ParquetFileReaderFactory for CachingParquetReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ partitioned_file: PartitionedFile,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> DataFusionResult<Box<dyn AsyncFileReader + Send>> {
+ let bytes_scanned =
MetricBuilder::new(metrics).counter("bytes_scanned", partition_index);
+
+ let location = partitioned_file.object_meta.location.clone();
+
+ // Get or create the OnceCell for this file path
+ let cell = Arc::<OnceCell<Arc<ParquetMetaData>>>::clone(
+ self.cache
+ .lock()
+ .unwrap()
+ .entry(location.clone())
+ .or_insert_with(|| Arc::new(OnceCell::new())),
+ );
+
+ let mut inner = ParquetObjectReader::new(Arc::clone(&self.store),
location.clone())
+ .with_file_size(partitioned_file.object_meta.size);
+
+ if let Some(hint) = metadata_size_hint {
+ inner = inner.with_footer_size_hint(hint);
+ }
+
+ Ok(Box::new(CachingParquetFileReader {
+ inner,
+ location,
+ cell,
+ bytes_scanned,
+ }))
+ }
+}
+
+struct CachingParquetFileReader {
+ inner: ParquetObjectReader,
+ location: Path,
+ cell: MetadataCell,
+ bytes_scanned: datafusion::physical_plan::metrics::Count,
+}
+
+impl AsyncFileReader for CachingParquetFileReader {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ self.bytes_scanned.add((range.end - range.start) as usize);
+ self.inner.get_bytes(range)
+ }
+
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+ where
+ Self: Send,
+ {
+ let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
+ self.bytes_scanned.add(total as usize);
+ self.inner.get_byte_ranges(ranges)
+ }
+
+ fn get_metadata<'a>(
+ &'a mut self,
+ options: Option<&'a ArrowReaderOptions>,
+ ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ let cell = Arc::clone(&self.cell);
+ let location = self.location.clone();
+
+ async move {
+ let metadata = cell
+ .get_or_try_init(|| async {
+ log::trace!("CachingParquetFileReader: loading footer for
{}", location);
+ self.inner.get_metadata(options).await
+ })
+ .await?;
+
+ log::trace!("CachingParquetFileReader: cache HIT for {}",
self.location);
+ Ok(Arc::clone(metadata))
+ }
+ .boxed()
+ }
+}
diff --git a/native/core/src/parquet/schema_adapter.rs
b/native/core/src/parquet/schema_adapter.rs
index 0ad61df42..535ed7062 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -66,20 +66,18 @@ fn remap_physical_schema_names(
logical_schema: &SchemaRef,
physical_schema: &SchemaRef,
) -> SchemaRef {
- let logical_names: HashMap<String, &str> = logical_schema
- .fields()
- .iter()
- .map(|f| (f.name().to_lowercase(), f.name().as_str()))
- .collect();
-
let remapped_fields: Vec<_> = physical_schema
.fields()
.iter()
.map(|field| {
- if let Some(logical_name) =
logical_names.get(&field.name().to_lowercase()) {
- if *logical_name != field.name() {
+ if let Some(logical_field) = logical_schema
+ .fields()
+ .iter()
+ .find(|lf| lf.name().eq_ignore_ascii_case(field.name()))
+ {
+ if logical_field.name() != field.name() {
Arc::new(Field::new(
- *logical_name,
+ logical_field.name(),
field.data_type().clone(),
field.is_nullable(),
))
@@ -121,7 +119,7 @@ impl PhysicalExprAdapterFactory for
SparkPhysicalExprAdapterFactory {
.fields()
.iter()
.find(|pf| {
- pf.name().to_lowercase() ==
logical_field.name().to_lowercase()
+
pf.name().eq_ignore_ascii_case(logical_field.name())
&& pf.name() != logical_field.name()
})
.map(|pf| (logical_field.name().clone(),
pf.name().clone()))
@@ -264,7 +262,7 @@ impl SparkPhysicalExprAdapter {
self.physical_file_schema
.fields()
.iter()
- .find(|f| f.name().to_lowercase() ==
col_name.to_lowercase())
+ .find(|f| f.name().eq_ignore_ascii_case(col_name))
};
if let (Some(logical_field), Some(physical_field)) =
(logical_field, physical_field)
@@ -530,9 +528,7 @@ mod test {
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
- let mut stream = parquet_exec
- .execute(0, Arc::new(TaskContext::default()))
- .unwrap();
+ let mut stream = parquet_exec.execute(0,
Arc::new(TaskContext::default()))?;
stream.next().await.unwrap()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]