This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0e221721fb Make `LexOrdering::inner` non pub, add comments, update
usages (#14155)
0e221721fb is described below
commit 0e221721fb9b11300c0ac33290dc28d2eccfcc27
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 17 16:58:10 2025 -0500
Make `LexOrdering::inner` non pub, add comments, update usages (#14155)
---
datafusion/core/src/datasource/listing/table.rs | 12 ++---
.../datasource/physical_plan/file_scan_config.rs | 7 ++-
.../src/datasource/physical_plan/statistics.rs | 6 +--
.../replace_with_order_preserving_variants.rs | 5 +-
datafusion/core/src/physical_optimizer/utils.rs | 2 +-
.../core/tests/fuzz_cases/equivalence/utils.rs | 2 +-
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 2 +-
datafusion/physical-expr-common/src/sort_expr.rs | 58 ++++++++++++++++++----
.../physical-expr/src/equivalence/ordering.rs | 6 +--
.../physical-expr/src/equivalence/properties.rs | 11 ++--
datafusion/physical-plan/src/joins/utils.rs | 7 +--
datafusion/physical-plan/src/memory.rs | 6 +--
datafusion/physical-plan/src/sorts/sort.rs | 2 +-
datafusion/physical-plan/src/topk/mod.rs | 2 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 32 ++++++------
15 files changed, 94 insertions(+), 66 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 42a2239c59..9b9bcd22c4 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1314,15 +1314,15 @@ mod tests {
// ok with one column
(
vec![vec![col("string_col").sort(true, false)]],
- Ok(vec![LexOrdering {
- inner: vec![PhysicalSortExpr {
+ Ok(vec![LexOrdering::new(
+ vec![PhysicalSortExpr {
expr: physical_col("string_col", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: false,
},
}],
- }
+ )
])
),
// ok with two columns, different options
@@ -1331,8 +1331,8 @@ mod tests {
col("string_col").sort(true, false),
col("int_col").sort(false, true),
]],
- Ok(vec![LexOrdering {
- inner: vec![
+ Ok(vec![LexOrdering::new(
+ vec![
PhysicalSortExpr::new_default(physical_col("string_col", &schema).unwrap())
.asc()
.nulls_last(),
@@ -1340,7 +1340,7 @@ mod tests {
.desc()
.nulls_first()
],
- }
+ )
])
),
];
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index a1cd892336..5a38886bb1 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -1134,9 +1134,8 @@ mod tests {
))))
.collect::<Vec<_>>(),
));
- let sort_order = LexOrdering {
- inner: case
- .sort
+ let sort_order = LexOrdering::from(
+ case.sort
.into_iter()
.map(|expr| {
crate::physical_planner::create_physical_sort_expr(
@@ -1146,7 +1145,7 @@ mod tests {
)
})
.collect::<Result<Vec<_>>>()?,
- };
+ );
let partitioned_files =
case.files.into_iter().map(From::from).collect::<Vec<_>>();
diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs
b/datafusion/core/src/datasource/physical_plan/statistics.rs
index 5e0257022e..b4a8f377d2 100644
--- a/datafusion/core/src/datasource/physical_plan/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/statistics.rs
@@ -119,8 +119,8 @@ impl MinMaxStatistics {
projected_schema
.project(&(sort_columns.iter().map(|c|
c.index()).collect::<Vec<_>>()))?,
);
- let min_max_sort_order = LexOrdering {
- inner: sort_columns
+ let min_max_sort_order = LexOrdering::from(
+ sort_columns
.iter()
.zip(projected_sort_order.iter())
.enumerate()
@@ -129,7 +129,7 @@ impl MinMaxStatistics {
options: sort.options,
})
.collect::<Vec<_>>(),
- };
+ );
let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
.iter()
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 1bbe9d483c..f32ffa8a58 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -133,10 +133,7 @@ fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
- let spm = SortPreservingMergeExec::new(
- LexOrdering::new(ordering.inner.clone()),
- Arc::clone(child),
- );
+ let spm = SortPreservingMergeExec::new(ordering.clone(),
Arc::clone(child));
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index cdecc9d318..9f2c28d564 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -40,7 +40,7 @@ pub fn add_sort_above<T: Clone + Default>(
fetch: Option<usize>,
) -> PlanContext<T> {
let mut sort_expr = LexOrdering::from(sort_requirements);
- sort_expr.inner.retain(|sort_expr| {
+ sort_expr.retain(|sort_expr| {
!node
.plan
.equivalence_properties()
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
index 4d599879df..5bf42ea688 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -444,7 +444,7 @@ pub fn generate_table_for_orderings(
assert!(!orderings.is_empty());
// Sort the inner vectors by their lengths (longest first)
- orderings.sort_by_key(|v| std::cmp::Reverse(v.inner.len()));
+ orderings.sort_by_key(|v| std::cmp::Reverse(v.len()));
let arrays = schema
.fields
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 67666f5d7a..979aa5a2da 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -617,7 +617,7 @@ async fn run_window_test(
options: SortOptions::default(),
})
}
- for order_by_expr in &orderby_exprs.inner {
+ for order_by_expr in &orderby_exprs {
if !sort_keys.contains(order_by_expr) {
sort_keys.push(order_by_expr.clone())
}
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
index 63397e69c0..b150d3dc9b 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -325,9 +325,13 @@ fn to_str(options: &SortOptions) -> &str {
///`LexOrdering` contains a `Vec<PhysicalSortExpr>`, which represents
/// a lexicographical ordering.
+///
+/// For example, `vec![a ASC, b DESC]` represents a lexicographical ordering
+/// that first sorts by column `a` in ascending order, then by column `b` in
+/// descending order.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct LexOrdering {
- pub inner: Vec<PhysicalSortExpr>,
+ inner: Vec<PhysicalSortExpr>,
}
impl AsRef<LexOrdering> for LexOrdering {
@@ -337,7 +341,7 @@ impl AsRef<LexOrdering> for LexOrdering {
}
impl LexOrdering {
- // Creates a new [`LexOrdering`] from a vector
+ /// Creates a new [`LexOrdering`] from a vector
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
Self { inner }
}
@@ -348,46 +352,61 @@ impl LexOrdering {
&EMPTY_ORDER
}
+ /// Returns the number of elements that can be stored in the LexOrdering
+ /// without reallocating.
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
+ /// Clears the LexOrdering, removing all elements.
pub fn clear(&mut self) {
self.inner.clear()
}
+ /// Returns `true` if the LexOrdering contains `expr`
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
self.inner.contains(expr)
}
+ /// Add all elements from `iter` to the LexOrdering.
pub fn extend<I: IntoIterator<Item = PhysicalSortExpr>>(&mut self, iter:
I) {
self.inner.extend(iter)
}
+ /// Remove all elements from the LexOrdering where `f` evaluates to
`false`.
+ pub fn retain<F>(&mut self, f: F)
+ where
+ F: FnMut(&PhysicalSortExpr) -> bool,
+ {
+ self.inner.retain(f)
+ }
+
+ /// Returns `true` if the LexOrdering contains no elements.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
- pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortExpr> {
+ /// Returns an iterator over each `&PhysicalSortExpr` in the LexOrdering.
+ pub fn iter(&self) -> core::slice::Iter<PhysicalSortExpr> {
self.inner.iter()
}
+ /// Returns the number of elements in the LexOrdering.
pub fn len(&self) -> usize {
self.inner.len()
}
+ /// Removes the last element from the LexOrdering and returns it, or
`None` if it is empty.
pub fn pop(&mut self) -> Option<PhysicalSortExpr> {
self.inner.pop()
}
+ /// Appends an element to the back of the LexOrdering.
pub fn push(&mut self, physical_sort_expr: PhysicalSortExpr) {
self.inner.push(physical_sort_expr)
}
- pub fn retain(&mut self, f: impl FnMut(&PhysicalSortExpr) -> bool) {
- self.inner.retain(f)
- }
-
+ /// Truncates the LexOrdering, keeping only the first `len` elements.
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
}
@@ -400,9 +419,12 @@ impl LexOrdering {
/// Converts a `LexRequirement` into a `LexOrdering`.
///
- /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
- /// for each entry in the input. If required ordering is None for an entry
- /// default ordering `ASC, NULLS LAST` if given (see the
`PhysicalSortExpr::from`).
+ /// This function converts [`PhysicalSortRequirement`] to
[`PhysicalSortExpr`]
+ /// for each entry in the input.
+ ///
+ /// If the required ordering is `None` for an entry in `requirement`, the
+ /// default ordering `ASC, NULLS LAST` is used (see
+ /// [`PhysicalSortExpr::from`]).
pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
requirement
.into_iter()
@@ -425,6 +447,15 @@ impl LexOrdering {
}
output
}
+
+ /// Transforms each `PhysicalSortExpr` in the `LexOrdering`
+ /// in place using the provided closure `f`.
+ pub fn transform<F>(&mut self, f: F)
+ where
+ F: FnMut(&mut PhysicalSortExpr),
+ {
+ self.inner.iter_mut().for_each(f);
+ }
}
impl From<Vec<PhysicalSortExpr>> for LexOrdering {
@@ -439,6 +470,13 @@ impl From<LexRequirement> for LexOrdering {
}
}
+/// Convert a `LexOrdering` into a `Arc[<PhysicalSortExpr>]` for fast copies
+impl From<LexOrdering> for Arc<[PhysicalSortExpr]> {
+ fn from(value: LexOrdering) -> Self {
+ value.inner.into()
+ }
+}
+
impl Deref for LexOrdering {
type Target = [PhysicalSortExpr];
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 4954c21728..4e324663dc 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -207,7 +207,7 @@ impl OrderingEquivalenceClass {
for idx in 0..n_ordering {
// Calculate cross product index
let idx = outer_idx * n_ordering + idx;
- self.orderings[idx].inner.extend(ordering.iter().cloned());
+ self.orderings[idx].extend(ordering.iter().cloned());
}
}
self
@@ -217,9 +217,9 @@ impl OrderingEquivalenceClass {
/// ordering equivalence class.
pub fn add_offset(&mut self, offset: usize) {
for ordering in self.orderings.iter_mut() {
- for sort_expr in ordering.inner.iter_mut() {
+ ordering.transform(|sort_expr| {
sort_expr.expr =
add_offset_to_expr(Arc::clone(&sort_expr.expr), offset);
- }
+ })
}
}
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 85440e0d3e..fed33315d7 100755
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -203,7 +203,6 @@ impl EquivalenceProperties {
let mut output_ordering =
self.oeq_class().output_ordering().unwrap_or_default();
// Prune out constant expressions
output_ordering
- .inner
.retain(|sort_expr| !const_exprs_contains(constants,
&sort_expr.expr));
(!output_ordering.is_empty()).then_some(output_ordering)
}
@@ -795,7 +794,6 @@ impl EquivalenceProperties {
// Generate all valid orderings, given substituted expressions.
let res = new_orderings
.into_iter()
- .map(|ordering| ordering.inner)
.multi_cartesian_product()
.map(LexOrdering::new)
.collect::<Vec<_>>();
@@ -1344,7 +1342,6 @@ impl EquivalenceProperties {
let mut new_orderings = vec![];
for ordering in self.oeq_class {
let new_ordering = ordering
- .inner
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
@@ -1630,7 +1627,7 @@ fn generate_dependency_orderings(
.map(|prefixes| {
prefixes
.into_iter()
- .flat_map(|ordering| ordering.inner.clone())
+ .flat_map(|ordering| ordering.clone())
.collect()
})
.collect::<Vec<_>>()
@@ -2300,8 +2297,8 @@ impl UnionEquivalentOrderingBuilder {
existing_constants: &[ConstExpr],
) -> Option<LexOrdering> {
let mut augmented_ordering = LexOrdering::default();
- let mut sort_expr_iter = ordering.inner.iter().peekable();
- let mut existing_sort_expr_iter =
existing_ordering.inner.iter().peekable();
+ let mut sort_expr_iter = ordering.iter().peekable();
+ let mut existing_sort_expr_iter = existing_ordering.iter().peekable();
// walk in parallel down the two orderings, trying to match them up
while sort_expr_iter.peek().is_some() ||
existing_sort_expr_iter.peek().is_some()
@@ -2881,7 +2878,7 @@ mod tests {
let leading_orderings = eq_properties
.oeq_class()
.iter()
- .flat_map(|ordering| ordering.inner.first().cloned())
+ .flat_map(|ordering| ordering.first().cloned())
.collect::<Vec<_>>();
let expr_props =
eq_properties.get_expr_properties(Arc::clone(&expr));
let err_msg = format!(
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 0210035cc2..dea4305fa6 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -456,7 +456,7 @@ fn replace_on_columns_of_right_ordering(
right_ordering: &mut LexOrdering,
) -> Result<()> {
for (left_col, right_col) in on_columns {
- for item in right_ordering.inner.iter_mut() {
+ right_ordering.transform(|item| {
let new_expr = Arc::clone(&item.expr)
.transform(|e| {
if e.eq(right_col) {
@@ -465,9 +465,10 @@ fn replace_on_columns_of_right_ordering(
Ok(Transformed::no(e))
}
})
- .data()?;
+ .data()
+ .expect("closure is infallible");
item.expr = new_expr;
- }
+ });
}
Ok(())
}
diff --git a/datafusion/physical-plan/src/memory.rs
b/datafusion/physical-plan/src/memory.rs
index ed0bfa75d1..fb58a04fcc 100644
--- a/datafusion/physical-plan/src/memory.rs
+++ b/datafusion/physical-plan/src/memory.rs
@@ -353,7 +353,7 @@ impl MemoryExec {
let fields = self.schema.fields();
let ambiguous_column = sort_information
.iter()
- .flat_map(|ordering| ordering.inner.clone())
+ .flat_map(|ordering| ordering.clone())
.flat_map(|expr| collect_columns(&expr.expr))
.find(|col| {
fields
@@ -695,8 +695,8 @@ mod memory_exec_tests {
.try_with_sort_information(sort_information)?;
assert_eq!(
- mem_exec.properties().output_ordering().unwrap().to_vec(),
- expected_output_order.inner
+ mem_exec.properties().output_ordering().unwrap(),
+ &expected_output_order
);
let eq_properties = mem_exec.properties().equivalence_properties();
assert!(eq_properties.oeq_class().contains(&sort1));
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 33c8a2b2fe..fd7e426a82 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -282,7 +282,7 @@ impl ExternalSorter {
in_mem_batches: vec![],
in_mem_batches_sorted: true,
spills: vec![],
- expr: expr.inner.into(),
+ expr: expr.into(),
metrics,
fetch,
reservation,
diff --git a/datafusion/physical-plan/src/topk/mod.rs
b/datafusion/physical-plan/src/topk/mod.rs
index ca154925df..ed1df6b1b8 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -108,7 +108,7 @@ impl TopK {
let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
.register(&runtime.memory_pool);
- let expr: Arc<[PhysicalSortExpr]> = expr.inner.into();
+ let expr: Arc<[PhysicalSortExpr]> = expr.into();
let sort_fields: Vec<_> = expr
.iter()
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 157711b4e2..7c34fe0680 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -332,15 +332,13 @@ fn roundtrip_window() -> Result<()> {
let udwf_expr = Arc::new(StandardWindowExpr::new(
nth_value_window,
&[col("b", &schema)?],
- &LexOrdering {
- inner: vec![PhysicalSortExpr {
- expr: col("a", &schema)?,
- options: SortOptions {
- descending: false,
- nulls_first: false,
- },
- }],
- },
+ &LexOrdering::new(vec![PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ }]),
Arc::new(window_frame),
));
@@ -1134,15 +1132,13 @@ fn roundtrip_udwf_extension_codec() -> Result<()> {
let udwf_expr = Arc::new(StandardWindowExpr::new(
udwf,
&[col("b", &schema)?],
- &LexOrdering {
- inner: vec![PhysicalSortExpr {
- expr: col("a", &schema)?,
- options: SortOptions {
- descending: false,
- nulls_first: false,
- },
- }],
- },
+ &LexOrdering::new(vec![PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ }]),
Arc::new(window_frame),
));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]