This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 8867501dc chore: fix allocations in schema adapter for 
`native_datafusion` scan (#3755)
8867501dc is described below

commit 8867501dc42141a4fb7933412078ba794c8a2ea6
Author: Oleks V <[email protected]>
AuthorDate: Mon Mar 23 08:47:57 2026 -0700

    chore: fix allocations in schema adapter for `native_datafusion` scan 
(#3755)
---
 native/core/src/parquet/mod.rs                     |   1 +
 native/core/src/parquet/parquet_exec.rs            |   6 +
 .../src/parquet/parquet_read_cached_factory.rs     | 149 +++++++++++++++++++++
 native/core/src/parquet/schema_adapter.rs          |  24 ++--
 4 files changed, 166 insertions(+), 14 deletions(-)

diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index d24a6a503..a59a349bf 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -23,6 +23,7 @@ pub use mutable_vector::*;
 #[macro_use]
 pub mod util;
 pub mod parquet_exec;
+pub mod parquet_read_cached_factory;
 pub mod parquet_support;
 pub mod read;
 pub mod schema_adapter;
diff --git a/native/core/src/parquet/parquet_exec.rs 
b/native/core/src/parquet/parquet_exec.rs
index 2d970734b..ef4c878b9 100644
--- a/native/core/src/parquet/parquet_exec.rs
+++ b/native/core/src/parquet/parquet_exec.rs
@@ -17,6 +17,7 @@
 
 use crate::execution::operators::ExecutionError;
 use crate::parquet::encryption_support::{CometEncryptionConfig, 
ENCRYPTION_FACTORY_ID};
+use crate::parquet::parquet_read_cached_factory::CachingParquetReaderFactory;
 use crate::parquet::parquet_support::SparkParquetOptions;
 use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
 use arrow::datatypes::{Field, SchemaRef};
@@ -149,6 +150,11 @@ pub(crate) fn init_datasource_exec(
         );
     }
 
+    // Use caching reader factory to avoid redundant footer reads across 
partitions
+    let store = session_ctx.runtime_env().object_store(&object_store_url)?;
+    parquet_source = parquet_source
+        
.with_parquet_file_reader_factory(Arc::new(CachingParquetReaderFactory::new(store)));
+
     let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
         SparkPhysicalExprAdapterFactory::new(spark_parquet_options, 
default_values),
     );
diff --git a/native/core/src/parquet/parquet_read_cached_factory.rs 
b/native/core/src/parquet/parquet_read_cached_factory.rs
new file mode 100644
index 000000000..f90e53411
--- /dev/null
+++ b/native/core/src/parquet/parquet_read_cached_factory.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A `ParquetFileReaderFactory` that caches parquet footer metadata across
+//! partitions within a single scan. When multiple Spark partitions read from
+//! the same parquet file (different row group ranges), this avoids redundant
+//! footer reads and parsing.
+//!
+//! The cache is scoped to the factory instance (one per scan), not global,
+//! so it does not persist across queries.
+//!
+//! Uses `tokio::sync::OnceCell` per file path so that concurrent partitions
+//! wait for the first reader to load the footer rather than all racing.
+
+use bytes::Bytes;
+use datafusion::common::Result as DataFusionResult;
+use datafusion::datasource::physical_plan::parquet::ParquetFileReaderFactory;
+use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricBuilder};
+use datafusion_datasource::PartitionedFile;
+use futures::future::BoxFuture;
+use futures::FutureExt;
+use object_store::path::Path;
+use object_store::ObjectStore;
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
+use parquet::file::metadata::ParquetMetaData;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+use tokio::sync::OnceCell;
+
+type MetadataCell = Arc<OnceCell<Arc<ParquetMetaData>>>;
+
+/// A `ParquetFileReaderFactory` that caches footer metadata by file path.
+/// The cache is scoped to this factory instance (shared across partitions
+/// within a single scan via Arc), not global.
+#[derive(Debug)]
+pub struct CachingParquetReaderFactory {
+    store: Arc<dyn ObjectStore>,
+    cache: Arc<Mutex<HashMap<Path, MetadataCell>>>,
+}
+
+impl CachingParquetReaderFactory {
+    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            store,
+            cache: Arc::new(Mutex::new(HashMap::new())),
+        }
+    }
+}
+
+impl ParquetFileReaderFactory for CachingParquetReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        partitioned_file: PartitionedFile,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> DataFusionResult<Box<dyn AsyncFileReader + Send>> {
+        let bytes_scanned = 
MetricBuilder::new(metrics).counter("bytes_scanned", partition_index);
+
+        let location = partitioned_file.object_meta.location.clone();
+
+        // Get or create the OnceCell for this file path
+        let cell = Arc::<OnceCell<Arc<ParquetMetaData>>>::clone(
+            self.cache
+                .lock()
+                .unwrap()
+                .entry(location.clone())
+                .or_insert_with(|| Arc::new(OnceCell::new())),
+        );
+
+        let mut inner = ParquetObjectReader::new(Arc::clone(&self.store), 
location.clone())
+            .with_file_size(partitioned_file.object_meta.size);
+
+        if let Some(hint) = metadata_size_hint {
+            inner = inner.with_footer_size_hint(hint);
+        }
+
+        Ok(Box::new(CachingParquetFileReader {
+            inner,
+            location,
+            cell,
+            bytes_scanned,
+        }))
+    }
+}
+
+struct CachingParquetFileReader {
+    inner: ParquetObjectReader,
+    location: Path,
+    cell: MetadataCell,
+    bytes_scanned: datafusion::physical_plan::metrics::Count,
+}
+
+impl AsyncFileReader for CachingParquetFileReader {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
+        self.bytes_scanned.add((range.end - range.start) as usize);
+        self.inner.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
+        self.bytes_scanned.add(total as usize);
+        self.inner.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata<'a>(
+        &'a mut self,
+        options: Option<&'a ArrowReaderOptions>,
+    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        let cell = Arc::clone(&self.cell);
+        let location = self.location.clone();
+
+        async move {
+            let metadata = cell
+                .get_or_try_init(|| async {
+                    log::trace!("CachingParquetFileReader: loading footer for 
{}", location);
+                    self.inner.get_metadata(options).await
+                })
+                .await?;
+
+            log::trace!("CachingParquetFileReader: cache HIT for {}", 
self.location);
+            Ok(Arc::clone(metadata))
+        }
+        .boxed()
+    }
+}
diff --git a/native/core/src/parquet/schema_adapter.rs 
b/native/core/src/parquet/schema_adapter.rs
index 0ad61df42..535ed7062 100644
--- a/native/core/src/parquet/schema_adapter.rs
+++ b/native/core/src/parquet/schema_adapter.rs
@@ -66,20 +66,18 @@ fn remap_physical_schema_names(
     logical_schema: &SchemaRef,
     physical_schema: &SchemaRef,
 ) -> SchemaRef {
-    let logical_names: HashMap<String, &str> = logical_schema
-        .fields()
-        .iter()
-        .map(|f| (f.name().to_lowercase(), f.name().as_str()))
-        .collect();
-
     let remapped_fields: Vec<_> = physical_schema
         .fields()
         .iter()
         .map(|field| {
-            if let Some(logical_name) = 
logical_names.get(&field.name().to_lowercase()) {
-                if *logical_name != field.name() {
+            if let Some(logical_field) = logical_schema
+                .fields()
+                .iter()
+                .find(|lf| lf.name().eq_ignore_ascii_case(field.name()))
+            {
+                if logical_field.name() != field.name() {
                     Arc::new(Field::new(
-                        *logical_name,
+                        logical_field.name(),
                         field.data_type().clone(),
                         field.is_nullable(),
                     ))
@@ -121,7 +119,7 @@ impl PhysicalExprAdapterFactory for 
SparkPhysicalExprAdapterFactory {
                             .fields()
                             .iter()
                             .find(|pf| {
-                                pf.name().to_lowercase() == 
logical_field.name().to_lowercase()
+                                
pf.name().eq_ignore_ascii_case(logical_field.name())
                                     && pf.name() != logical_field.name()
                             })
                             .map(|pf| (logical_field.name().clone(), 
pf.name().clone()))
@@ -264,7 +262,7 @@ impl SparkPhysicalExprAdapter {
                     self.physical_file_schema
                         .fields()
                         .iter()
-                        .find(|f| f.name().to_lowercase() == 
col_name.to_lowercase())
+                        .find(|f| f.name().eq_ignore_ascii_case(col_name))
                 };
 
                 if let (Some(logical_field), Some(physical_field)) = 
(logical_field, physical_field)
@@ -530,9 +528,7 @@ mod test {
 
         let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
 
-        let mut stream = parquet_exec
-            .execute(0, Arc::new(TaskContext::default()))
-            .unwrap();
+        let mut stream = parquet_exec.execute(0, 
Arc::new(TaskContext::default()))?;
         stream.next().await.unwrap()
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to