zhuqi-lucas commented on code in PR #19433:
URL: https://github.com/apache/datafusion/pull/19433#discussion_r2654860488
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -336,17 +336,64 @@ impl ListingTable {
self.options.format.file_source(table_schema)
}
- /// If file_sort_order is specified, creates the appropriate physical
expressions
+ /// Creates output ordering from user-specified file_sort_order or derives
+ /// from file orderings when user doesn't specify.
+ ///
+ /// If user specified `file_sort_order`, that takes precedence.
+ /// Otherwise, attempts to derive common ordering from file orderings in
+ /// the provided file groups.
pub fn try_create_output_ordering(
&self,
execution_props: &ExecutionProps,
+ file_groups: &[FileGroup],
) -> datafusion_common::Result<Vec<LexOrdering>> {
- create_lex_ordering(
- &self.table_schema,
- &self.options.file_sort_order,
- execution_props,
- )
+ // If user specified sort order, use that
+ if !self.options.file_sort_order.is_empty() {
+ return create_lex_ordering(
+ &self.table_schema,
+ &self.options.file_sort_order,
+ execution_props,
+ );
+ }
+
+ // Otherwise, try to derive from file orderings
+ Ok(derive_common_ordering_from_files(file_groups))
+ }
+}
+
+/// Derives a common ordering from file orderings across all file groups.
+///
+/// Returns the common ordering if all files have compatible orderings,
+/// otherwise returns an empty Vec (no ordering).
+fn derive_common_ordering_from_files(file_groups: &[FileGroup]) ->
Vec<LexOrdering> {
+ // Collect all file orderings
+ let mut all_orderings: Vec<&LexOrdering> = Vec::new();
+ for group in file_groups {
+ for file in group.iter() {
+ if let Some(ordering) = &file.ordering {
+ all_orderings.push(ordering);
+ } else {
+ // If any file has no ordering, we can't derive a common
ordering
Review Comment:
We may add some debug or tracing log here:
```suggestion
// If any file has no ordering, we can't derive a common
ordering
tracing!(
"Cannot derive common ordering: file {} has different ordering
({:?}) than first file ({:?})",
file.object_meta.location, ordering, first
);
```
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -336,17 +336,64 @@ impl ListingTable {
self.options.format.file_source(table_schema)
}
- /// If file_sort_order is specified, creates the appropriate physical
expressions
+ /// Creates output ordering from user-specified file_sort_order or derives
+ /// from file orderings when user doesn't specify.
+ ///
+ /// If user specified `file_sort_order`, that takes precedence.
+ /// Otherwise, attempts to derive common ordering from file orderings in
+ /// the provided file groups.
pub fn try_create_output_ordering(
&self,
execution_props: &ExecutionProps,
+ file_groups: &[FileGroup],
) -> datafusion_common::Result<Vec<LexOrdering>> {
- create_lex_ordering(
- &self.table_schema,
- &self.options.file_sort_order,
- execution_props,
- )
+ // If user specified sort order, use that
+ if !self.options.file_sort_order.is_empty() {
+ return create_lex_ordering(
+ &self.table_schema,
+ &self.options.file_sort_order,
+ execution_props,
+ );
+ }
+
+ // Otherwise, try to derive from file orderings
+ Ok(derive_common_ordering_from_files(file_groups))
+ }
+}
+
+/// Derives a common ordering from file orderings across all file groups.
+///
+/// Returns the common ordering if all files have compatible orderings,
+/// otherwise returns an empty Vec (no ordering).
+fn derive_common_ordering_from_files(file_groups: &[FileGroup]) ->
Vec<LexOrdering> {
+ // Collect all file orderings
+ let mut all_orderings: Vec<&LexOrdering> = Vec::new();
+ for group in file_groups {
+ for file in group.iter() {
+ if let Some(ordering) = &file.ordering {
+ all_orderings.push(ordering);
+ } else {
+ // If any file has no ordering, we can't derive a common
ordering
+ return vec![];
+ }
+ }
+ }
+
+ if all_orderings.is_empty() {
+ return vec![];
}
+
+ // Check that all orderings are identical
+ let first = all_orderings[0];
+ for ordering in &all_orderings[1..] {
+ if *ordering != first {
Review Comment:
Is it possible to use ordering_satisfy?
##########
datafusion/datasource/src/file_format.rs:
##########
@@ -41,6 +42,35 @@ use object_store::{ObjectMeta, ObjectStore};
/// Default max records to scan to infer the schema
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+/// Metadata fetched from a file, including statistics and ordering.
+///
+/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to
+/// provide all metadata in a single read, avoiding duplicate I/O operations.
Review Comment:
```suggestion
/// Metadata fetched from a file, including statistics and ordering.
///
/// This struct is returned by [`FileFormat::infer_stats_and_ordering`] to
/// provide all metadata in a single read, avoiding duplicate I/O operations.
///
/// Note: Individual components (statistics and ordering) are typically
cached
/// separately by [`FileStatisticsCache`] implementations to enable partial
cache hits.
```
##########
datafusion/catalog-listing/src/table.rs:
##########
@@ -694,42 +748,54 @@ impl ListingTable {
})
}
- /// Collects statistics for a given partitioned file.
+ /// Collects statistics and ordering for a given partitioned file.
///
- /// This method first checks if the statistics for the given file are
already cached.
- /// If they are, it returns the cached statistics.
- /// If they are not, it infers the statistics from the file and stores
them in the cache.
- async fn do_collect_statistics(
+ /// This method checks if both statistics and ordering are cached.
+ /// If both are cached, returns them without any file access.
+ /// If only statistics are cached, infers ordering and caches it.
+ /// If neither is cached, infers both in a single metadata read.
+ async fn do_collect_statistics_and_ordering(
&self,
ctx: &dyn Session,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
- ) -> datafusion_common::Result<Arc<Statistics>> {
- match self
- .collected_statistics
- .get_with_extra(&part_file.object_meta.location,
&part_file.object_meta)
- {
- Some(statistics) => Ok(statistics),
- None => {
- let statistics = self
- .options
- .format
- .infer_stats(
- ctx,
- store,
- Arc::clone(&self.file_schema),
- &part_file.object_meta,
- )
- .await?;
- let statistics = Arc::new(statistics);
- self.collected_statistics.put_with_extra(
- &part_file.object_meta.location,
- Arc::clone(&statistics),
- &part_file.object_meta,
- );
- Ok(statistics)
+ ) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
+ let path = &part_file.object_meta.location;
+ let meta = &part_file.object_meta;
+
+ // Check if statistics are cached
+ if let Some(statistics) =
self.collected_statistics.get_with_extra(path, meta) {
+ // Statistics cache hit - check if ordering is also cached
+ if let Some(ordering) =
self.collected_statistics.get_ordering(path, meta) {
+ // Both cached - return without any file access
+ return Ok((statistics, ordering));
}
+
+ // Statistics cached but ordering not - infer ordering and cache it
+ let ordering = self
+ .options
+ .format
+ .infer_ordering(ctx, store, Arc::clone(&self.file_schema),
meta)
Review Comment:
Here we get the cached statistic, but we still need to compute the ordering
info.
If it's possible we only have one entry for it, and cached it or not. And
the cache including the computed ordering info.
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -43,6 +44,33 @@ pub trait FileStatisticsCache:
{
/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
Review Comment:
Is it useful that we can add the has_ordering info to the
FileStatisticsCacheEntry?
FileStatisticsCacheEntry {
object_meta: object_meta.clone(),
num_rows: stats.num_rows,
num_columns: stats.column_statistics.len(),
table_size_bytes: stats.total_byte_size,
statistics_size_bytes: 0,
has_ordering, // NEW: indicates if ordering is cached
},
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -500,7 +542,22 @@ impl FileFormat for ParquetFormat {
return not_impl_err!("Overwrites are not implemented yet for
Parquet");
}
- let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
+ // Convert ordering requirements to Parquet SortingColumns for file
metadata
+ let sorting_columns = if let Some(ref requirements) =
order_requirements {
+ let ordering: LexOrdering = requirements.clone().into();
+ // In cases like `COPY (... ORDER BY ...) TO ...` the ORDER BY
clause
+ // may not be compatible with Parquet sorting columns (e.g.
ordering on `random()`).
+ // So if we cannot create a Parquet sorting column from the
ordering requirement,
+ // we skip setting sorting columns on the Parquet sink.
+ lex_ordering_to_sorting_columns(&ordering).ok()
Review Comment:
Great point!
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -31,14 +32,54 @@ pub use crate::cache::DefaultFilesMetadataCache;
/// Default implementation of [`FileStatisticsCache`]
///
-/// Stores collected statistics for files
+/// Stores collected statistics and file orderings for files.
///
-/// Cache is invalided when file size or last modification has changed
+/// Cache is invalidated when file size or last modification has changed.
///
/// [`FileStatisticsCache`]: crate::cache::cache_manager::FileStatisticsCache
#[derive(Default)]
pub struct DefaultFileStatisticsCache {
statistics: DashMap<Path, (ObjectMeta, Arc<Statistics>)>,
+ /// Cached file orderings, keyed by file path.
+ /// Stored separately from statistics to maintain backwards compatibility
+ /// with the FileStatisticsCache trait interface.
+ orderings: DashMap<Path, (ObjectMeta, Option<LexOrdering>)>,
Review Comment:
Is it possible we just use one map:
```rust
pub struct DefaultFileStatisticsCache {
// Store both stats and ordering together
cache: DashMap<Path, CacheEntry>,
}
struct CacheEntry {
meta: ObjectMeta,
statistics: Arc<Statistics>,
ordering: Option<LexOrdering>, // Initially None until fetched
}
```
##########
datafusion/physical-expr-common/src/sort_expr.rs:
##########
@@ -803,6 +818,130 @@ impl DerefMut for OrderingRequirements {
#[cfg(test)]
mod tests {
use super::*;
+ use arrow::datatypes::{DataType, FieldRef, Schema};
+ use arrow::record_batch::RecordBatch;
+ use datafusion_common::Result;
+ use datafusion_expr_common::columnar_value::ColumnarValue;
+ use std::any::Any;
+ use std::fmt::{Display, Formatter};
+
+ /// A simple mock PhysicalExpr for testing, identified by name.
+ #[derive(Debug, Clone, PartialEq, Eq, Hash)]
+ struct TestExpr {
+ name: &'static str,
+ }
+
+ impl TestExpr {
+ fn new_expr(name: &'static str) -> Arc<dyn PhysicalExpr> {
+ Arc::new(Self { name })
+ }
+ }
+
+ impl Display for TestExpr {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.name)
+ }
+ }
+
+ impl PhysicalExpr for TestExpr {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+ Ok(DataType::Int32)
+ }
+
+ fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+ Ok(true)
+ }
+
+ fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
+ unimplemented!("TestExpr::evaluate is not needed for sort tests")
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _children: Vec<Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ Ok(self)
+ }
+
+ fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.name)
+ }
+
+ fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
+ unimplemented!("TestExpr::return_field is not needed for sort
tests")
+ }
+ }
+
+ /// Helper to create a PhysicalSortExpr with the given expression and
options.
+ fn sort_expr(expr: Arc<dyn PhysicalExpr>, options: SortOptions) ->
PhysicalSortExpr {
+ PhysicalSortExpr::new(expr, options)
+ }
+
+ #[test]
+ fn test_is_prefix() {
+ let asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let desc = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+
+ let a = TestExpr::new_expr("a");
+ let b = TestExpr::new_expr("b");
+ let c = TestExpr::new_expr("c");
+
+ // [a ASC] is a prefix of [a ASC] (exact match)
+ let ordering1 = LexOrdering::new([sort_expr(Arc::clone(&a),
asc)]).unwrap();
+ let ordering2 = LexOrdering::new([sort_expr(Arc::clone(&a),
asc)]).unwrap();
+ assert!(ordering1.is_prefix(&ordering2));
+
+ // [a ASC] is a prefix of [a ASC, b DESC]
+ let ordering_ab = LexOrdering::new([
+ sort_expr(Arc::clone(&a), asc),
+ sort_expr(Arc::clone(&b), desc),
+ ])
+ .unwrap();
+ assert!(ordering_ab.is_prefix(&ordering1));
+
+ // [a ASC, b DESC] is NOT a prefix of [a ASC] (other is longer)
+ assert!(!ordering1.is_prefix(&ordering_ab));
+
+ // [a DESC] is NOT a prefix of [a ASC] (different sort options)
+ let ordering_a_desc =
+ LexOrdering::new([sort_expr(Arc::clone(&a), desc)]).unwrap();
+ assert!(!ordering1.is_prefix(&ordering_a_desc));
+
+ // [b ASC] is NOT a prefix of [a ASC] (different expressions)
+ let ordering_b = LexOrdering::new([sort_expr(Arc::clone(&b),
asc)]).unwrap();
+ assert!(!ordering1.is_prefix(&ordering_b));
+
+ // [a ASC, b ASC] is a prefix of [a ASC, b ASC, c ASC]
+ let ordering_ab_asc = LexOrdering::new([
+ sort_expr(Arc::clone(&a), asc),
+ sort_expr(Arc::clone(&b), asc),
+ ])
+ .unwrap();
+ let ordering_abc = LexOrdering::new([
+ sort_expr(Arc::clone(&a), asc),
+ sort_expr(Arc::clone(&b), asc),
+ sort_expr(Arc::clone(&c), asc),
+ ])
+ .unwrap();
+ assert!(ordering_abc.is_prefix(&ordering_ab_asc));
+
+ // [a ASC, b DESC] is NOT a prefix of [a ASC, b ASC, c ASC] (mismatch
in middle)
+ assert!(!ordering_abc.is_prefix(&ordering_ab));
+ }
Review Comment:
Maybe add some tests for file modification:
```rust
#[tokio::test]
async fn test_ordering_cache_invalidation_on_file_modification() {
let cache = DefaultFileStatisticsCache::default();
let path = Path::from("test.parquet");
// Cache with original metadata
let meta_v1 = ObjectMeta {
location: path.clone(),
last_modified: Utc.timestamp_nanos(1000),
size: 100,
e_tag: None,
version: None,
};
let ordering_v1 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v1.clone(), &meta_v1);
// Verify cached
assert_eq!(cache.get_ordering(&path, &meta_v1),
Some(ordering_v1.clone()));
// File modified (size changed)
let meta_v2 = ObjectMeta {
last_modified: Utc.timestamp_nanos(2000),
size: 200, // ← changed
..meta_v1.clone()
};
// Should return None (cache miss due to size mismatch)
assert_eq!(cache.get_ordering(&path, &meta_v2), None);
// Cache new version
let ordering_v2 = Some(LexOrdering::new(vec![...]).unwrap());
cache.put_ordering(&path, ordering_v2.clone(), &meta_v2);
// Old metadata should still be invalid
assert_eq!(cache.get_ordering(&path, &meta_v1), None);
// New metadata should work
assert_eq!(cache.get_ordering(&path, &meta_v2), Some(ordering_v2));
}
```
##########
datafusion/datasource-parquet/src/metadata.rs:
##########
@@ -373,6 +397,194 @@ impl<'a> DFParquetMetadata<'a> {
Ok(statistics)
}
+
+ /// Extract LexOrdering from Parquet sorting_columns metadata.
+ ///
+ /// Returns `Ok(None)` if:
+ /// - No row groups exist
+ /// - No row group has sorting_columns
+ /// - Row groups have inconsistent sorting_columns
+ /// - A sorting column cannot be mapped to the Arrow schema
+ ///
+ /// # Arguments
+ /// * `metadata` - The Parquet file metadata
+ /// * `arrow_schema` - The Arrow schema to map column indices to
+ pub(crate) fn ordering_from_parquet_metadata(
+ metadata: &ParquetMetaData,
+ arrow_schema: &SchemaRef,
+ ) -> Result<Option<LexOrdering>> {
+ let row_groups = metadata.row_groups();
+ if row_groups.is_empty() {
+ return Ok(None);
+ }
+
+ // Get sorting_columns from first row group
+ let first_sorting = match row_groups[0].sorting_columns() {
+ Some(cols) if !cols.is_empty() => cols,
+ _ => return Ok(None),
+ };
+
+ // Verify all row groups have identical sorting_columns
+ for rg in &row_groups[1..] {
+ match rg.sorting_columns() {
+ Some(cols) if cols == first_sorting => {}
+ _ => {
+ debug!(
+ "Row groups have inconsistent sorting_columns,
treating as unordered"
+ );
+ return Ok(None);
+ }
+ }
+ }
+
+ // Get the Parquet schema descriptor for column name lookup
+ let file_metadata = metadata.file_metadata();
+ let parquet_schema = file_metadata.schema_descr();
+
+ // Convert Parquet schema to Arrow schema for column mapping
+ let parquet_arrow_schema =
+ parquet_to_arrow_schema(parquet_schema,
file_metadata.key_value_metadata())?;
+
+ // Convert each SortingColumn to PhysicalSortExpr
+ let sort_exprs: Vec<PhysicalSortExpr> = first_sorting
+ .iter()
+ .filter_map(|sorting_col| {
+ sorting_column_to_sort_expr(
+ sorting_col,
+ parquet_schema,
+ &parquet_arrow_schema,
+ arrow_schema,
+ )
+ .ok()
+ .flatten()
+ })
+ .collect();
+
+ // If we couldn't map any columns, return None
+ if sort_exprs.is_empty() {
+ return Ok(None);
+ }
+
+ // If we couldn't map all columns, the ordering is incomplete
+ // Only return the ordering if we mapped all sorting columns
+ if sort_exprs.len() != first_sorting.len() {
+ debug!(
+ "Could only map {}/{} sorting columns to Arrow schema",
+ sort_exprs.len(),
+ first_sorting.len()
+ );
+ return Ok(None);
+ }
+
+ Ok(LexOrdering::new(sort_exprs))
+ }
+}
+
+/// Convert a Parquet SortingColumn to a PhysicalSortExpr.
+///
+/// Returns `Ok(None)` if the column cannot be mapped to the Arrow schema.
+fn sorting_column_to_sort_expr(
+ sorting_col: &SortingColumn,
+ parquet_schema: &SchemaDescriptor,
+ parquet_arrow_schema: &Schema,
+ arrow_schema: &SchemaRef,
+) -> Result<Option<PhysicalSortExpr>> {
+ let column_idx = sorting_col.column_idx as usize;
+
+ // Get the column path from the Parquet schema
+ // The column_idx in SortingColumn refers to leaf columns
Review Comment:
Should we add test to nest type, it also can be sorted?
I am not sure if it's possible?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]