This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 07e8793e62 allow passing in metadata_size_hint on a per-file basis
(#13213)
07e8793e62 is described below
commit 07e8793e625aecaaee80e5d6849527e51e138c35
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Nov 6 05:23:43 2024 -0600
allow passing in metadata_size_hint on a per-file basis (#13213)
* allow passing in metadata_size_hint on a per-file basis
* fmt, lint
* revert
* Add test
* Add comment
---
datafusion/core/src/datasource/file_format/mod.rs | 1 +
datafusion/core/src/datasource/listing/helpers.rs | 1 +
datafusion/core/src/datasource/listing/mod.rs | 13 ++
.../datasource/physical_plan/file_scan_config.rs | 1 +
.../src/datasource/physical_plan/file_stream.rs | 1 +
.../core/src/datasource/physical_plan/mod.rs | 4 +
.../src/datasource/physical_plan/parquet/mod.rs | 137 ++++++++++++++++++++-
.../src/datasource/physical_plan/parquet/opener.rs | 4 +-
datafusion/core/src/datasource/schema_adapter.rs | 1 +
datafusion/core/src/test_util/parquet.rs | 1 +
datafusion/core/tests/parquet/custom_reader.rs | 1 +
datafusion/core/tests/parquet/page_pruning.rs | 1 +
datafusion/proto/src/physical_plan/from_proto.rs | 1 +
datafusion/substrait/src/physical_plan/consumer.rs | 1 +
14 files changed, 166 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 24f1111517..b0e1df5183 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -422,6 +422,7 @@ pub(crate) mod test_util {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}]];
let exec = format
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 1b3588d9a2..04c64156b1 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -468,6 +468,7 @@ pub async fn pruned_partition_list<'a>(
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
})
}));
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index c5a441aacf..f11653ce1e 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -81,6 +81,8 @@ pub struct PartitionedFile {
pub statistics: Option<Statistics>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ /// The estimated size of the parquet metadata, in bytes
+ pub metadata_size_hint: Option<usize>,
}
impl PartitionedFile {
@@ -98,6 +100,7 @@ impl PartitionedFile {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}
}
@@ -115,10 +118,19 @@ impl PartitionedFile {
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}
.with_range(start, end)
}
+ /// Provide a hint to the size of the file metadata. If a hint is provided
+ /// the reader will try and fetch the last `size_hint` bytes of the
parquet file optimistically.
+ /// Without an appropriate hint, two read may be required to fetch the
metadata.
+ pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) ->
Self {
+ self.metadata_size_hint = Some(metadata_size_hint);
+ self
+ }
+
/// Return a file reference from the given path
pub fn from_path(path: String) -> Result<Self> {
let size = std::fs::metadata(path.clone())?.len();
@@ -156,6 +168,7 @@ impl From<ObjectMeta> for PartitionedFile {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 6a162c97b6..fd14be08c7 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -1209,6 +1209,7 @@ mod tests {
.collect::<Vec<_>>(),
}),
extensions: None,
+ metadata_size_hint: None,
}
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 6f354b31ae..9d78a0f2e3 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -295,6 +295,7 @@ impl<F: FileOpener> FileStream<F> {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
+ metadata_size_hint: part_file.metadata_size_hint,
};
Some(
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 9971e87282..2b50458bb5 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -248,6 +248,8 @@ pub struct FileMeta {
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
+ /// Size hint for the metadata of this file
+ pub metadata_size_hint: Option<usize>,
}
impl FileMeta {
@@ -263,6 +265,7 @@ impl From<ObjectMeta> for FileMeta {
object_meta,
range: None,
extensions: None,
+ metadata_size_hint: None,
}
}
}
@@ -777,6 +780,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 059f86ce11..980f796a53 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -883,6 +883,7 @@ mod tests {
// See also `parquet_exec` integration test
use std::fs::{self, File};
use std::io::Write;
+ use std::sync::Mutex;
use super::*;
use crate::dataframe::DataFrameWriteOptions;
@@ -908,6 +909,7 @@ mod tests {
use arrow::datatypes::{Field, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Fields};
+ use bytes::{BufMut, BytesMut};
use datafusion_common::{assert_contains, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::planner::logical2physical;
@@ -917,7 +919,7 @@ mod tests {
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
- use object_store::ObjectMeta;
+ use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;
@@ -1656,6 +1658,7 @@ mod tests {
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
+ metadata_size_hint: None,
}
}
@@ -1748,6 +1751,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
};
let expected_schema = Schema::new(vec![
@@ -1835,6 +1839,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
};
let file_schema = Arc::new(Schema::empty());
@@ -2378,4 +2383,134 @@ mod tests {
writer.flush().unwrap();
writer.close().unwrap();
}
+
+ /// Write out a batch to a parquet file and return the total size of the
file
+ async fn write_batch(
+ path: &str,
+ store: Arc<dyn ObjectStore>,
+ batch: RecordBatch,
+ ) -> usize {
+ let mut writer =
+ ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(),
None).unwrap();
+ writer.write(&batch).unwrap();
+ writer.flush().unwrap();
+ let bytes = writer.into_inner().unwrap().into_inner().freeze();
+ let total_size = bytes.len();
+ let path = Path::from(path);
+ let payload = object_store::PutPayload::from_bytes(bytes);
+ store
+ .put_opts(&path, payload, object_store::PutOptions::default())
+ .await
+ .unwrap();
+ total_size
+ }
+
+ /// A ParquetFileReaderFactory that tracks the metadata_size_hint passed
to it
+ #[derive(Debug, Clone)]
+ struct TrackingParquetFileReaderFactory {
+ inner: Arc<dyn ParquetFileReaderFactory>,
+ metadata_size_hint_calls: Arc<Mutex<Vec<Option<usize>>>>,
+ }
+
+ impl TrackingParquetFileReaderFactory {
+ fn new(store: Arc<dyn ObjectStore>) -> Self {
+ Self {
+ inner: Arc::new(DefaultParquetFileReaderFactory::new(store))
as _,
+ metadata_size_hint_calls: Arc::new(Mutex::new(vec![])),
+ }
+ }
+ }
+
+ impl ParquetFileReaderFactory for TrackingParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: crate::datasource::physical_plan::FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader +
Send>>
+ {
+ self.metadata_size_hint_calls
+ .lock()
+ .unwrap()
+ .push(metadata_size_hint);
+ self.inner.create_reader(
+ partition_index,
+ file_meta,
+ metadata_size_hint,
+ metrics,
+ )
+ }
+ }
+
+ /// Test passing `metadata_size_hint` to either a single file or the whole
exec
+ #[tokio::test]
+ async fn test_metadata_size_hint() {
+ let store =
+ Arc::new(object_store::memory::InMemory::new()) as Arc<dyn
ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
+
+ let ctx = SessionContext::new();
+ ctx.register_object_store(store_url.as_ref(), store.clone());
+
+ // write some data out, it doesn't matter what it is
+ let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)]));
+ let batch = create_batch(vec![("c1", c1)]);
+ let schema = batch.schema();
+ let name_1 = "test1.parquet";
+ let name_2 = "test2.parquet";
+ let total_size_1 = write_batch(name_1, store.clone(),
batch.clone()).await;
+ let total_size_2 = write_batch(name_2, store.clone(),
batch.clone()).await;
+
+ let reader_factory =
+ Arc::new(TrackingParquetFileReaderFactory::new(store.clone()));
+
+ let size_hint_calls = reader_factory.metadata_size_hint_calls.clone();
+
+ let exec = ParquetExec::builder(
+ FileScanConfig::new(store_url, schema)
+ .with_file(
+ PartitionedFile {
+ object_meta: ObjectMeta {
+ location: Path::from(name_1),
+ last_modified: Utc::now(),
+ size: total_size_1,
+ e_tag: None,
+ version: None,
+ },
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ metadata_size_hint: None,
+ }
+ .with_metadata_size_hint(123),
+ )
+ .with_file(PartitionedFile {
+ object_meta: ObjectMeta {
+ location: Path::from(name_2),
+ last_modified: Utc::now(),
+ size: total_size_2,
+ e_tag: None,
+ version: None,
+ },
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ metadata_size_hint: None,
+ }),
+ )
+ .with_parquet_file_reader_factory(reader_factory)
+ .with_metadata_size_hint(456)
+ .build();
+
+ let exec = Arc::new(exec);
+ let res = collect(exec, ctx.task_ctx()).await.unwrap();
+ assert_eq!(res.len(), 2);
+
+ let calls = size_hint_calls.lock().unwrap().clone();
+ assert_eq!(calls.len(), 2);
+ assert_eq!(calls, vec![Some(123), Some(456)]);
+ }
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 4990cb4dd7..3492bcc85f 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -89,11 +89,13 @@ impl FileOpener for ParquetOpener {
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name,
&self.metrics);
+ let metadata_size_hint =
file_meta.metadata_size_hint.or(self.metadata_size_hint);
+
let mut reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
- self.metadata_size_hint,
+ metadata_size_hint,
&self.metrics,
)?;
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
index 5ba597e4b5..71f947e2c0 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -493,6 +493,7 @@ mod tests {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
};
let f1 = Field::new("id", DataType::Int32, true);
diff --git a/datafusion/core/src/test_util/parquet.rs
b/datafusion/core/src/test_util/parquet.rs
index 9f06ad9308..4e4a374720 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -151,6 +151,7 @@ impl TestParquetFile {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
});
let df_schema = self.schema.clone().to_dfschema_ref()?;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs
b/datafusion/core/tests/parquet/custom_reader.rs
index 7c1e199ceb..dc57ba1e44 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -71,6 +71,7 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
range: None,
statistics: None,
extensions:
Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+ metadata_size_hint: None,
})
.collect();
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index d201ed3a84..6f03ff3169 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -64,6 +64,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr)
-> ParquetExec {
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
};
let df_schema = schema.clone().to_dfschema().unwrap();
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 316166042f..31b59c2a94 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -559,6 +559,7 @@ impl TryFrom<&protobuf::PartitionedFile> for
PartitionedFile {
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
statistics: val.statistics.as_ref().map(|v|
v.try_into()).transpose()?,
extensions: None,
+ metadata_size_hint: None,
})
}
}
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index a8f8ce048e..f4548df430 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -127,6 +127,7 @@ pub async fn from_substrait_rel(
range: None,
statistics: None,
extensions: None,
+ metadata_size_hint: None,
};
let part_index = file.partition_index as usize;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]