This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f0232c995 Reattach parquet metadata cache after deserializing in 
datafusion-proto (#20574)
1f0232c995 is described below

commit 1f0232c995111e2a9b8d58228ca604c804d0a879
Author: nathan <[email protected]>
AuthorDate: Thu Mar 5 02:10:34 2026 -0500

    Reattach parquet metadata cache after deserializing in datafusion-proto 
(#20574)
    
    - Addressing: https://github.com/apache/datafusion/issues/20575
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/proto/src/physical_plan/mod.rs          | 20 ++++++--
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 58 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 5 deletions(-)

diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index bfba715b91..fce8ac658e 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -42,9 +42,13 @@ use datafusion_datasource_csv::source::CsvSource;
 use datafusion_datasource_json::file_format::JsonSink;
 use datafusion_datasource_json::source::JsonSource;
 #[cfg(feature = "parquet")]
+use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
+#[cfg(feature = "parquet")]
 use datafusion_datasource_parquet::file_format::ParquetSink;
 #[cfg(feature = "parquet")]
 use datafusion_datasource_parquet::source::ParquetSource;
+#[cfg(feature = "parquet")]
+use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{FunctionRegistry, TaskContext};
 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion_functions_table::generate_series::{
@@ -847,9 +851,19 @@ impl protobuf::PhysicalPlanNode {
 
             // Parse table schema with partition columns
             let table_schema = parse_table_schema_from_proto(base_conf)?;
-
-            let mut source =
-                
ParquetSource::new(table_schema).with_table_parquet_options(options);
+            let object_store_url = match base_conf.object_store_url.is_empty() 
{
+                false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
+                true => ObjectStoreUrl::local_filesystem(),
+            };
+            let store = ctx.runtime_env().object_store(object_store_url)?;
+            let metadata_cache =
+                ctx.runtime_env().cache_manager.get_file_metadata_cache();
+            let reader_factory =
+                Arc::new(CachedParquetFileReaderFactory::new(store, 
metadata_cache));
+
+            let mut source = ParquetSource::new(table_schema)
+                .with_parquet_file_reader_factory(reader_factory)
+                .with_table_parquet_options(options);
 
             if let Some(predicate) = predicate {
                 source = source.with_predicate(predicate);
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 230727c8c1..ccee240b94 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -36,8 +36,9 @@ use datafusion::datasource::listing::{
 };
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
-    ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, 
FileSinkConfig,
-    ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
+    ArrowSource, FileGroup, FileOutputMode, FileScanConfig, 
FileScanConfigBuilder,
+    FileSinkConfig, ParquetSource, wrap_partition_type_in_dict,
+    wrap_partition_value_in_dict,
 };
 use datafusion::datasource::sink::DataSinkExec;
 use datafusion::datasource::source::DataSourceExec;
@@ -929,6 +930,59 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> 
Result<()> {
     roundtrip_test(DataSourceExec::from_data_source(scan_config))
 }
 
+#[test]
+fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() -> 
Result<()> {
+    let file_schema =
+        Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
+    let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
+    let scan_config =
+        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
file_source)
+            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
+                "/path/to/file.parquet".to_string(),
+                1024,
+            )])])
+            .with_statistics(Statistics {
+                num_rows: Precision::Inexact(100),
+                total_byte_size: Precision::Inexact(1024),
+                column_statistics: Statistics::unknown_column(&file_schema),
+            })
+            .build();
+    let exec_plan = DataSourceExec::from_data_source(scan_config);
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DefaultPhysicalProtoConverter {};
+    let roundtripped =
+        roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?;
+
+    let data_source = roundtripped
+        .as_any()
+        .downcast_ref::<DataSourceExec>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected DataSourceExec after roundtrip")
+        })?;
+    let file_scan = data_source
+        .data_source()
+        .as_any()
+        .downcast_ref::<FileScanConfig>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected FileScanConfig after roundtrip")
+        })?;
+    let parquet_source = file_scan
+        .file_source()
+        .as_any()
+        .downcast_ref::<ParquetSource>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected ParquetSource after roundtrip")
+        })?;
+
+    assert!(
+        parquet_source.parquet_file_reader_factory().is_some(),
+        "Parquet reader factory should be attached after decoding from 
protobuf"
+    );
+    Ok(())
+}
+
 #[test]
 fn roundtrip_arrow_scan() -> Result<()> {
     let file_schema =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to