This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e221721fb Make `LexOrdering::inner` non pub, add comments, update 
usages (#14155)
0e221721fb is described below

commit 0e221721fb9b11300c0ac33290dc28d2eccfcc27
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 17 16:58:10 2025 -0500

    Make `LexOrdering::inner` non pub, add comments, update usages (#14155)
---
 datafusion/core/src/datasource/listing/table.rs    | 12 ++---
 .../datasource/physical_plan/file_scan_config.rs   |  7 ++-
 .../src/datasource/physical_plan/statistics.rs     |  6 +--
 .../replace_with_order_preserving_variants.rs      |  5 +-
 datafusion/core/src/physical_optimizer/utils.rs    |  2 +-
 .../core/tests/fuzz_cases/equivalence/utils.rs     |  2 +-
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  2 +-
 datafusion/physical-expr-common/src/sort_expr.rs   | 58 ++++++++++++++++++----
 .../physical-expr/src/equivalence/ordering.rs      |  6 +--
 .../physical-expr/src/equivalence/properties.rs    | 11 ++--
 datafusion/physical-plan/src/joins/utils.rs        |  7 +--
 datafusion/physical-plan/src/memory.rs             |  6 +--
 datafusion/physical-plan/src/sorts/sort.rs         |  2 +-
 datafusion/physical-plan/src/topk/mod.rs           |  2 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 32 ++++++------
 15 files changed, 94 insertions(+), 66 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 42a2239c59..9b9bcd22c4 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1314,15 +1314,15 @@ mod tests {
             // ok with one column
             (
                 vec![vec![col("string_col").sort(true, false)]],
-                Ok(vec![LexOrdering {
-                        inner: vec![PhysicalSortExpr {
+                Ok(vec![LexOrdering::new(
+                        vec![PhysicalSortExpr {
                             expr: physical_col("string_col", &schema).unwrap(),
                             options: SortOptions {
                                 descending: false,
                                 nulls_first: false,
                             },
                         }],
-                    }
+                )
                 ])
             ),
             // ok with two columns, different options
@@ -1331,8 +1331,8 @@ mod tests {
                     col("string_col").sort(true, false),
                     col("int_col").sort(false, true),
                 ]],
-                Ok(vec![LexOrdering {
-                        inner: vec![
+                Ok(vec![LexOrdering::new(
+                        vec![
                             
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
                                         .asc()
                                         .nulls_last(),
@@ -1340,7 +1340,7 @@ mod tests {
                                         .desc()
                                         .nulls_first()
                         ],
-                    }
+                )
                 ])
             ),
         ];
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index a1cd892336..5a38886bb1 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -1134,9 +1134,8 @@ mod tests {
                     ))))
                     .collect::<Vec<_>>(),
             ));
-            let sort_order = LexOrdering {
-                inner: case
-                    .sort
+            let sort_order = LexOrdering::from(
+                case.sort
                     .into_iter()
                     .map(|expr| {
                         crate::physical_planner::create_physical_sort_expr(
@@ -1146,7 +1145,7 @@ mod tests {
                         )
                     })
                     .collect::<Result<Vec<_>>>()?,
-            };
+            );
 
             let partitioned_files =
                 case.files.into_iter().map(From::from).collect::<Vec<_>>();
diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/statistics.rs
index 5e0257022e..b4a8f377d2 100644
--- a/datafusion/core/src/datasource/physical_plan/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/statistics.rs
@@ -119,8 +119,8 @@ impl MinMaxStatistics {
             projected_schema
                 .project(&(sort_columns.iter().map(|c| 
c.index()).collect::<Vec<_>>()))?,
         );
-        let min_max_sort_order = LexOrdering {
-            inner: sort_columns
+        let min_max_sort_order = LexOrdering::from(
+            sort_columns
                 .iter()
                 .zip(projected_sort_order.iter())
                 .enumerate()
@@ -129,7 +129,7 @@ impl MinMaxStatistics {
                     options: sort.options,
                 })
                 .collect::<Vec<_>>(),
-        };
+        );
 
         let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
             .iter()
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 1bbe9d483c..f32ffa8a58 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -133,10 +133,7 @@ fn plan_with_order_preserving_variants(
         if let Some(ordering) = child.output_ordering() {
             // When the input of a `CoalescePartitionsExec` has an ordering,
             // replace it with a `SortPreservingMergeExec` if appropriate:
-            let spm = SortPreservingMergeExec::new(
-                LexOrdering::new(ordering.inner.clone()),
-                Arc::clone(child),
-            );
+            let spm = SortPreservingMergeExec::new(ordering.clone(), 
Arc::clone(child));
             sort_input.plan = Arc::new(spm) as _;
             sort_input.children[0].data = true;
             return Ok(sort_input);
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index cdecc9d318..9f2c28d564 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -40,7 +40,7 @@ pub fn add_sort_above<T: Clone + Default>(
     fetch: Option<usize>,
 ) -> PlanContext<T> {
     let mut sort_expr = LexOrdering::from(sort_requirements);
-    sort_expr.inner.retain(|sort_expr| {
+    sort_expr.retain(|sort_expr| {
         !node
             .plan
             .equivalence_properties()
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
index 4d599879df..5bf42ea688 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -444,7 +444,7 @@ pub fn generate_table_for_orderings(
 
     assert!(!orderings.is_empty());
     // Sort the inner vectors by their lengths (longest first)
-    orderings.sort_by_key(|v| std::cmp::Reverse(v.inner.len()));
+    orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));
 
     let arrays = schema
         .fields
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 67666f5d7a..979aa5a2da 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -617,7 +617,7 @@ async fn run_window_test(
             options: SortOptions::default(),
         })
     }
-    for order_by_expr in &orderby_exprs.inner {
+    for order_by_expr in &orderby_exprs {
         if !sort_keys.contains(order_by_expr) {
             sort_keys.push(order_by_expr.clone())
         }
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs 
b/datafusion/physical-expr-common/src/sort_expr.rs
index 63397e69c0..b150d3dc9b 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -325,9 +325,13 @@ fn to_str(options: &SortOptions) -> &str {
 
 ///`LexOrdering` contains a `Vec<PhysicalSortExpr>`, which represents
 /// a lexicographical ordering.
+///
+/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering
+/// that first sorts by column `a` in ascending order, then by column `b` in
+/// descending order.
 #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
 pub struct LexOrdering {
-    pub inner: Vec<PhysicalSortExpr>,
+    inner: Vec<PhysicalSortExpr>,
 }
 
 impl AsRef<LexOrdering> for LexOrdering {
@@ -337,7 +341,7 @@ impl AsRef<LexOrdering> for LexOrdering {
 }
 
 impl LexOrdering {
-    // Creates a new [`LexOrdering`] from a vector
+    /// Creates a new [`LexOrdering`] from a vector
     pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
         Self { inner }
     }
@@ -348,46 +352,61 @@ impl LexOrdering {
         &EMPTY_ORDER
     }
 
+    /// Returns the number of elements that can be stored in the LexOrdering
+    /// without reallocating.
     pub fn capacity(&self) -> usize {
         self.inner.capacity()
     }
 
+    /// Clears the LexOrdering, removing all elements.
     pub fn clear(&mut self) {
         self.inner.clear()
     }
 
+    /// Returns `true` if the LexOrdering contains `expr`
     pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
         self.inner.contains(expr)
     }
 
+    /// Add all elements from `iter` to the LexOrdering.
     pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter: 
I) {
         self.inner.extend(iter)
     }
 
+    /// Remove all elements from the LexOrdering where `f` evaluates to 
`false`.
+    pub fn retain<F>(&mut self, f: F)
+    where
+        F: FnMut(&PhysicalSortExpr) -> bool,
+    {
+        self.inner.retain(f)
+    }
+
+    /// Returns `true` if the LexOrdering contains no elements.
     pub fn is_empty(&self) -> bool {
         self.inner.is_empty()
     }
 
-    pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
+    /// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering.
+    pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
         self.inner.iter()
     }
 
+    /// Returns the number of elements in the LexOrdering.
     pub fn len(&self) -> usize {
         self.inner.len()
     }
 
+    /// Removes the last element from the LexOrdering and returns it, or 
`None` if it is empty.
     pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
         self.inner.pop()
     }
 
+    /// Appends an element to the back of the LexOrdering.
     pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
         self.inner.push(physical_sort_expr)
     }
 
-    pub fn retain(&mut self, f: impl FnMut(&PhysicalSortExpr) -> bool) {
-        self.inner.retain(f)
-    }
-
+    /// Truncates the LexOrdering, keeping only the first `len` elements.
     pub fn truncate(&mut self, len: usize) {
         self.inner.truncate(len)
     }
@@ -400,9 +419,12 @@ impl LexOrdering {
 
     /// Converts a `LexRequirement` into a `LexOrdering`.
     ///
-    /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
-    /// for each entry in the input. If required ordering is None for an entry
-    /// default ordering `ASC, NULLS LAST` if given (see the 
`PhysicalSortExpr::from`).
+    /// This function converts [`PhysicalSortRequirement`] to 
[`PhysicalSortExpr`]
+    /// for each entry in the input.
+    ///
+    /// If the required ordering is `None` for an entry in `requirement`, the
+    /// default ordering `ASC, NULLS LAST` is used (see
+    /// [`PhysicalSortExpr::from`]).
     pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
         requirement
             .into_iter()
@@ -425,6 +447,15 @@ impl LexOrdering {
         }
         output
     }
+
+    /// Transforms each `PhysicalSortExpr` in the `LexOrdering`
+    /// in place using the provided closure `f`.
+    pub fn transform<F>(&mut self, f: F)
+    where
+        F: FnMut(&mut PhysicalSortExpr),
+    {
+        self.inner.iter_mut().for_each(f);
+    }
 }
 
 impl From<Vec<PhysicalSortExpr>> for LexOrdering {
@@ -439,6 +470,13 @@ impl From<LexRequirement> for LexOrdering {
     }
 }
 
+/// Convert a `LexOrdering` into a `Arc[<PhysicalSortExpr>]` for fast copies
+impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
+    fn from(value: LexOrdering) -> Self {
+        value.inner.into()
+    }
+}
+
 impl Deref for LexOrdering {
     type Target = [PhysicalSortExpr];
 
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 4954c21728..4e324663dc 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -207,7 +207,7 @@ impl OrderingEquivalenceClass {
             for idx in 0..n_ordering {
                 // Calculate cross product index
                 let idx = outer_idx * n_ordering + idx;
-                self.orderings[idx].inner.extend(ordering.iter().cloned());
+                self.orderings[idx].extend(ordering.iter().cloned());
             }
         }
         self
@@ -217,9 +217,9 @@ impl OrderingEquivalenceClass {
     /// ordering equivalence class.
     pub fn add_offset(&mut self, offset: usize) {
         for ordering in self.orderings.iter_mut() {
-            for sort_expr in ordering.inner.iter_mut() {
+            ordering.transform(|sort_expr| {
                 sort_expr.expr = 
add_offset_to_expr(Arc::clone(&sort_expr.expr), offset);
-            }
+            })
         }
     }
 
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 85440e0d3e..fed33315d7 100755
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -203,7 +203,6 @@ impl EquivalenceProperties {
         let mut output_ordering = 
self.oeq_class().output_ordering().unwrap_or_default();
         // Prune out constant expressions
         output_ordering
-            .inner
             .retain(|sort_expr| !const_exprs_contains(constants, 
&sort_expr.expr));
         (!output_ordering.is_empty()).then_some(output_ordering)
     }
@@ -795,7 +794,6 @@ impl EquivalenceProperties {
         // Generate all valid orderings, given substituted expressions.
         let res = new_orderings
             .into_iter()
-            .map(|ordering| ordering.inner)
             .multi_cartesian_product()
             .map(LexOrdering::new)
             .collect::<Vec<_>>();
@@ -1344,7 +1342,6 @@ impl EquivalenceProperties {
         let mut new_orderings = vec![];
         for ordering in self.oeq_class {
             let new_ordering = ordering
-                .inner
                 .into_iter()
                 .map(|mut sort_expr| {
                     sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
@@ -1630,7 +1627,7 @@ fn generate_dependency_orderings(
                 .map(|prefixes| {
                     prefixes
                         .into_iter()
-                        .flat_map(|ordering| ordering.inner.clone())
+                        .flat_map(|ordering| ordering.clone())
                         .collect()
                 })
                 .collect::<Vec<_>>()
@@ -2300,8 +2297,8 @@ impl UnionEquivalentOrderingBuilder {
         existing_constants: &[ConstExpr],
     ) -> Option<LexOrdering> {
         let mut augmented_ordering = LexOrdering::default();
-        let mut sort_expr_iter = ordering.inner.iter().peekable();
-        let mut existing_sort_expr_iter = 
existing_ordering.inner.iter().peekable();
+        let mut sort_expr_iter = ordering.iter().peekable();
+        let mut existing_sort_expr_iter = existing_ordering.iter().peekable();
 
         // walk in parallel down the two orderings, trying to match them up
         while sort_expr_iter.peek().is_some() || 
existing_sort_expr_iter.peek().is_some()
@@ -2881,7 +2878,7 @@ mod tests {
             let leading_orderings = eq_properties
                 .oeq_class()
                 .iter()
-                .flat_map(|ordering| ordering.inner.first().cloned())
+                .flat_map(|ordering| ordering.first().cloned())
                 .collect::<Vec<_>>();
             let expr_props = 
eq_properties.get_expr_properties(Arc::clone(&expr));
             let err_msg = format!(
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 0210035cc2..dea4305fa6 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -456,7 +456,7 @@ fn replace_on_columns_of_right_ordering(
     right_ordering: &mut LexOrdering,
 ) -> Result<()> {
     for (left_col, right_col) in on_columns {
-        for item in right_ordering.inner.iter_mut() {
+        right_ordering.transform(|item| {
             let new_expr = Arc::clone(&item.expr)
                 .transform(|e| {
                     if e.eq(right_col) {
@@ -465,9 +465,10 @@ fn replace_on_columns_of_right_ordering(
                         Ok(Transformed::no(e))
                     }
                 })
-                .data()?;
+                .data()
+                .expect("closure is infallible");
             item.expr = new_expr;
-        }
+        });
     }
     Ok(())
 }
diff --git a/datafusion/physical-plan/src/memory.rs 
b/datafusion/physical-plan/src/memory.rs
index ed0bfa75d1..fb58a04fcc 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -353,7 +353,7 @@ impl MemoryExec {
         let fields = self.schema.fields();
         let ambiguous_column = sort_information
             .iter()
-            .flat_map(|ordering| ordering.inner.clone())
+            .flat_map(|ordering| ordering.clone())
             .flat_map(|expr| collect_columns(&expr.expr))
             .find(|col| {
                 fields
@@ -695,8 +695,8 @@ mod memory_exec_tests {
             .try_with_sort_information(sort_information)?;
 
         assert_eq!(
-            mem_exec.properties().output_ordering().unwrap().to_vec(),
-            expected_output_order.inner
+            mem_exec.properties().output_ordering().unwrap(),
+            &expected_output_order
         );
         let eq_properties = mem_exec.properties().equivalence_properties();
         assert!(eq_properties.oeq_class().contains(&sort1));
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 33c8a2b2fe..fd7e426a82 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -282,7 +282,7 @@ impl ExternalSorter {
             in_mem_batches: vec![],
             in_mem_batches_sorted: true,
             spills: vec![],
-            expr: expr.inner.into(),
+            expr: expr.into(),
             metrics,
             fetch,
             reservation,
diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index ca154925df..ed1df6b1b8 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -108,7 +108,7 @@ impl TopK {
         let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
             .register(&runtime.memory_pool);
 
-        let expr: Arc<[PhysicalSortExpr]> = expr.inner.into();
+        let expr: Arc<[PhysicalSortExpr]> = expr.into();
 
         let sort_fields: Vec<_> = expr
             .iter()
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 157711b4e2..7c34fe0680 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -332,15 +332,13 @@ fn roundtrip_window() -> Result<()> {
     let udwf_expr = Arc::new(StandardWindowExpr::new(
         nth_value_window,
         &[col("b", &schema)?],
-        &LexOrdering {
-            inner: vec![PhysicalSortExpr {
-                expr: col("a", &schema)?,
-                options: SortOptions {
-                    descending: false,
-                    nulls_first: false,
-                },
-            }],
-        },
+        &LexOrdering::new(vec![PhysicalSortExpr {
+            expr: col("a", &schema)?,
+            options: SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+        }]),
         Arc::new(window_frame),
     ));
 
@@ -1134,15 +1132,13 @@ fn roundtrip_udwf_extension_codec() -> Result<()> {
     let udwf_expr = Arc::new(StandardWindowExpr::new(
         udwf,
         &[col("b", &schema)?],
-        &LexOrdering {
-            inner: vec![PhysicalSortExpr {
-                expr: col("a", &schema)?,
-                options: SortOptions {
-                    descending: false,
-                    nulls_first: false,
-                },
-            }],
-        },
+        &LexOrdering::new(vec![PhysicalSortExpr {
+            expr: col("a", &schema)?,
+            options: SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+        }]),
         Arc::new(window_frame),
     ));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to