This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 07e8793e62 allow passing in metadata_size_hint on a per-file basis 
(#13213)
07e8793e62 is described below

commit 07e8793e625aecaaee80e5d6849527e51e138c35
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Nov 6 05:23:43 2024 -0600

    allow passing in metadata_size_hint on a per-file basis (#13213)
    
    * allow passing in metadata_size_hint on a per-file basis
    
    * fmt, lint
    
    * revert
    
    * Add test
    
    * Add comment
---
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 datafusion/core/src/datasource/listing/helpers.rs  |   1 +
 datafusion/core/src/datasource/listing/mod.rs      |  13 ++
 .../datasource/physical_plan/file_scan_config.rs   |   1 +
 .../src/datasource/physical_plan/file_stream.rs    |   1 +
 .../core/src/datasource/physical_plan/mod.rs       |   4 +
 .../src/datasource/physical_plan/parquet/mod.rs    | 137 ++++++++++++++++++++-
 .../src/datasource/physical_plan/parquet/opener.rs |   4 +-
 datafusion/core/src/datasource/schema_adapter.rs   |   1 +
 datafusion/core/src/test_util/parquet.rs           |   1 +
 datafusion/core/tests/parquet/custom_reader.rs     |   1 +
 datafusion/core/tests/parquet/page_pruning.rs      |   1 +
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 +
 datafusion/substrait/src/physical_plan/consumer.rs |   1 +
 14 files changed, 166 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 24f1111517..b0e1df5183 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -422,6 +422,7 @@ pub(crate) mod test_util {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         }]];
 
         let exec = format
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 1b3588d9a2..04c64156b1 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -468,6 +468,7 @@ pub async fn pruned_partition_list<'a>(
                     range: None,
                     statistics: None,
                     extensions: None,
+                    metadata_size_hint: None,
                 })
             }));
 
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index c5a441aacf..f11653ce1e 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -81,6 +81,8 @@ pub struct PartitionedFile {
     pub statistics: Option<Statistics>,
     /// An optional field for user defined per object metadata
     pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+    /// The estimated size of the parquet metadata, in bytes
+    pub metadata_size_hint: Option<usize>,
 }
 
 impl PartitionedFile {
@@ -98,6 +100,7 @@ impl PartitionedFile {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         }
     }
 
@@ -115,10 +118,19 @@ impl PartitionedFile {
             range: Some(FileRange { start, end }),
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         }
         .with_range(start, end)
     }
 
+    /// Provide a hint to the size of the file metadata. If a hint is provided
+    /// the reader will try and fetch the last `size_hint` bytes of the 
parquet file optimistically.
+    /// Without an appropriate hint, two read may be required to fetch the 
metadata.
+    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> 
Self {
+        self.metadata_size_hint = Some(metadata_size_hint);
+        self
+    }
+
     /// Return a file reference from the given path
     pub fn from_path(path: String) -> Result<Self> {
         let size = std::fs::metadata(path.clone())?.len();
@@ -156,6 +168,7 @@ impl From<ObjectMeta> for PartitionedFile {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         }
     }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 6a162c97b6..fd14be08c7 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -1209,6 +1209,7 @@ mod tests {
                             .collect::<Vec<_>>(),
                     }),
                     extensions: None,
+                    metadata_size_hint: None,
                 }
             }
         }
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs 
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 6f354b31ae..9d78a0f2e3 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -295,6 +295,7 @@ impl<F: FileOpener> FileStream<F> {
             object_meta: part_file.object_meta,
             range: part_file.range,
             extensions: part_file.extensions,
+            metadata_size_hint: part_file.metadata_size_hint,
         };
 
         Some(
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 9971e87282..2b50458bb5 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -248,6 +248,8 @@ pub struct FileMeta {
     pub range: Option<FileRange>,
     /// An optional field for user defined per object metadata
     pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+    /// Size hint for the metadata of this file
+    pub metadata_size_hint: Option<usize>,
 }
 
 impl FileMeta {
@@ -263,6 +265,7 @@ impl From<ObjectMeta> for FileMeta {
             object_meta,
             range: None,
             extensions: None,
+            metadata_size_hint: None,
         }
     }
 }
@@ -777,6 +780,7 @@ mod tests {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         }
     }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 059f86ce11..980f796a53 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -883,6 +883,7 @@ mod tests {
     // See also `parquet_exec` integration test
     use std::fs::{self, File};
     use std::io::Write;
+    use std::sync::Mutex;
 
     use super::*;
     use crate::dataframe::DataFrameWriteOptions;
@@ -908,6 +909,7 @@ mod tests {
     use arrow::datatypes::{Field, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
     use arrow_schema::{DataType, Fields};
+    use bytes::{BufMut, BytesMut};
     use datafusion_common::{assert_contains, ScalarValue};
     use datafusion_expr::{col, lit, when, Expr};
     use datafusion_physical_expr::planner::logical2physical;
@@ -917,7 +919,7 @@ mod tests {
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
-    use object_store::ObjectMeta;
+    use object_store::{ObjectMeta, ObjectStore};
     use parquet::arrow::ArrowWriter;
     use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
@@ -1656,6 +1658,7 @@ mod tests {
                 range: Some(FileRange { start, end }),
                 statistics: None,
                 extensions: None,
+                metadata_size_hint: None,
             }
         }
 
@@ -1748,6 +1751,7 @@ mod tests {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         };
 
         let expected_schema = Schema::new(vec![
@@ -1835,6 +1839,7 @@ mod tests {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         };
 
         let file_schema = Arc::new(Schema::empty());
@@ -2378,4 +2383,134 @@ mod tests {
         writer.flush().unwrap();
         writer.close().unwrap();
     }
+
+    /// Write out a batch to a parquet file and return the total size of the 
file
+    async fn write_batch(
+        path: &str,
+        store: Arc<dyn ObjectStore>,
+        batch: RecordBatch,
+    ) -> usize {
+        let mut writer =
+            ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), 
None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.flush().unwrap();
+        let bytes = writer.into_inner().unwrap().into_inner().freeze();
+        let total_size = bytes.len();
+        let path = Path::from(path);
+        let payload = object_store::PutPayload::from_bytes(bytes);
+        store
+            .put_opts(&path, payload, object_store::PutOptions::default())
+            .await
+            .unwrap();
+        total_size
+    }
+
+    /// A ParquetFileReaderFactory that tracks the metadata_size_hint passed 
to it
+    #[derive(Debug, Clone)]
+    struct TrackingParquetFileReaderFactory {
+        inner: Arc<dyn ParquetFileReaderFactory>,
+        metadata_size_hint_calls: Arc<Mutex<Vec<Option<usize>>>>,
+    }
+
+    impl TrackingParquetFileReaderFactory {
+        fn new(store: Arc<dyn ObjectStore>) -> Self {
+            Self {
+                inner: Arc::new(DefaultParquetFileReaderFactory::new(store)) 
as _,
+                metadata_size_hint_calls: Arc::new(Mutex::new(vec![])),
+            }
+        }
+    }
+
+    impl ParquetFileReaderFactory for TrackingParquetFileReaderFactory {
+        fn create_reader(
+            &self,
+            partition_index: usize,
+            file_meta: crate::datasource::physical_plan::FileMeta,
+            metadata_size_hint: Option<usize>,
+            metrics: &ExecutionPlanMetricsSet,
+        ) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + 
Send>>
+        {
+            self.metadata_size_hint_calls
+                .lock()
+                .unwrap()
+                .push(metadata_size_hint);
+            self.inner.create_reader(
+                partition_index,
+                file_meta,
+                metadata_size_hint,
+                metrics,
+            )
+        }
+    }
+
+    /// Test passing `metadata_size_hint` to either a single file or the whole 
exec
+    #[tokio::test]
+    async fn test_metadata_size_hint() {
+        let store =
+            Arc::new(object_store::memory::InMemory::new()) as Arc<dyn 
ObjectStore>;
+        let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
+
+        let ctx = SessionContext::new();
+        ctx.register_object_store(store_url.as_ref(), store.clone());
+
+        // write some data out, it doesn't matter what it is
+        let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)]));
+        let batch = create_batch(vec![("c1", c1)]);
+        let schema = batch.schema();
+        let name_1 = "test1.parquet";
+        let name_2 = "test2.parquet";
+        let total_size_1 = write_batch(name_1, store.clone(), 
batch.clone()).await;
+        let total_size_2 = write_batch(name_2, store.clone(), 
batch.clone()).await;
+
+        let reader_factory =
+            Arc::new(TrackingParquetFileReaderFactory::new(store.clone()));
+
+        let size_hint_calls = reader_factory.metadata_size_hint_calls.clone();
+
+        let exec = ParquetExec::builder(
+            FileScanConfig::new(store_url, schema)
+                .with_file(
+                    PartitionedFile {
+                        object_meta: ObjectMeta {
+                            location: Path::from(name_1),
+                            last_modified: Utc::now(),
+                            size: total_size_1,
+                            e_tag: None,
+                            version: None,
+                        },
+                        partition_values: vec![],
+                        range: None,
+                        statistics: None,
+                        extensions: None,
+                        metadata_size_hint: None,
+                    }
+                    .with_metadata_size_hint(123),
+                )
+                .with_file(PartitionedFile {
+                    object_meta: ObjectMeta {
+                        location: Path::from(name_2),
+                        last_modified: Utc::now(),
+                        size: total_size_2,
+                        e_tag: None,
+                        version: None,
+                    },
+                    partition_values: vec![],
+                    range: None,
+                    statistics: None,
+                    extensions: None,
+                    metadata_size_hint: None,
+                }),
+        )
+        .with_parquet_file_reader_factory(reader_factory)
+        .with_metadata_size_hint(456)
+        .build();
+
+        let exec = Arc::new(exec);
+        let res = collect(exec, ctx.task_ctx()).await.unwrap();
+        assert_eq!(res.len(), 2);
+
+        let calls = size_hint_calls.lock().unwrap().clone();
+        assert_eq!(calls.len(), 2);
+        assert_eq!(calls, vec![Some(123), Some(456)]);
+    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 4990cb4dd7..3492bcc85f 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -89,11 +89,13 @@ impl FileOpener for ParquetOpener {
         let file_metrics =
             ParquetFileMetrics::new(self.partition_index, &file_name, 
&self.metrics);
 
+        let metadata_size_hint = 
file_meta.metadata_size_hint.or(self.metadata_size_hint);
+
         let mut reader: Box<dyn AsyncFileReader> =
             self.parquet_file_reader_factory.create_reader(
                 self.partition_index,
                 file_meta,
-                self.metadata_size_hint,
+                metadata_size_hint,
                 &self.metrics,
             )?;
 
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
index 5ba597e4b5..71f947e2c0 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -493,6 +493,7 @@ mod tests {
             range: None,
             statistics: None,
             extensions: None,
+            metadata_size_hint: None,
         };
 
         let f1 = Field::new("id", DataType::Int32, true);
diff --git a/datafusion/core/src/test_util/parquet.rs 
b/datafusion/core/src/test_util/parquet.rs
index 9f06ad9308..4e4a374720 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -151,6 +151,7 @@ impl TestParquetFile {
                     range: None,
                     statistics: None,
                     extensions: None,
+                    metadata_size_hint: None,
                 });
 
         let df_schema = self.schema.clone().to_dfschema_ref()?;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs 
b/datafusion/core/tests/parquet/custom_reader.rs
index 7c1e199ceb..dc57ba1e44 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -71,6 +71,7 @@ async fn 
route_data_access_ops_to_parquet_file_reader_factory() {
             range: None,
             statistics: None,
             extensions: 
Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+            metadata_size_hint: None,
         })
         .collect();
 
diff --git a/datafusion/core/tests/parquet/page_pruning.rs 
b/datafusion/core/tests/parquet/page_pruning.rs
index d201ed3a84..6f03ff3169 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -64,6 +64,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) 
-> ParquetExec {
         range: None,
         statistics: None,
         extensions: None,
+        metadata_size_hint: None,
     };
 
     let df_schema = schema.clone().to_dfschema().unwrap();
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 316166042f..31b59c2a94 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -559,6 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for 
PartitionedFile {
             range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
             statistics: val.statistics.as_ref().map(|v| 
v.try_into()).transpose()?,
             extensions: None,
+            metadata_size_hint: None,
         })
     }
 }
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index a8f8ce048e..f4548df430 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -127,6 +127,7 @@ pub async fn from_substrait_rel(
                             range: None,
                             statistics: None,
                             extensions: None,
+                            metadata_size_hint: None,
                         };
 
                         let part_index = file.partition_index as usize;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to