This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 755ba9158a [minor]: remove same util functions from the code base.
(#13026)
755ba9158a is described below
commit 755ba9158ac2125b5d5b10bb76e27ee9137f7552
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Oct 21 23:48:37 2024 -0700
[minor]: remove same util functions from the code base. (#13026)
* Initial commit
* Resolve linter errors
* Decrease diff
---
.../core/tests/fuzz_cases/equivalence/ordering.rs | 173 +++++++++++++++++++-
.../core/tests/fuzz_cases/equivalence/utils.rs | 57 +++++++
datafusion/physical-expr/src/equivalence/mod.rs | 178 ---------------------
.../physical-expr/src/equivalence/ordering.rs | 171 +-------------------
4 files changed, 230 insertions(+), 349 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
index 604d1a1000..94157e1170 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::fuzz_cases::equivalence::utils::{
- convert_to_orderings, create_random_schema, create_test_schema_2,
+ convert_to_orderings, create_random_schema, create_test_params,
create_test_schema_2,
generate_table_for_eq_properties, generate_table_for_orderings,
is_table_same_after_sort, TestScalarUDF,
};
@@ -160,6 +160,177 @@ fn
test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
Ok(())
}
+#[test]
+fn test_ordering_satisfy_with_equivalence() -> Result<()> {
+ // Schema satisfies following orderings:
+ // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
+ // and
+ // Column [a=c] (e.g they are aliases).
+ let (test_schema, eq_properties) = create_test_params()?;
+ let col_a = &col("a", &test_schema)?;
+ let col_b = &col("b", &test_schema)?;
+ let col_c = &col("c", &test_schema)?;
+ let col_d = &col("d", &test_schema)?;
+ let col_e = &col("e", &test_schema)?;
+ let col_f = &col("f", &test_schema)?;
+ let col_g = &col("g", &test_schema)?;
+
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let option_desc = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let table_data_with_properties =
+ generate_table_for_eq_properties(&eq_properties, 625, 5)?;
+
+ // First element in the tuple stores vector of requirement, second element
is the expected return value for ordering_satisfy function
+ let requirements = vec![
+ // `a ASC NULLS LAST`, expects `ordering_satisfy` to be `true`, since
existing ordering `a ASC NULLS LAST, b ASC NULLS LAST` satisfies it
+ (vec![(col_a, option_asc)], true),
+ (vec![(col_a, option_desc)], false),
+ // Test whether equivalence works as expected
+ (vec![(col_c, option_asc)], true),
+ (vec![(col_c, option_desc)], false),
+ // Test whether ordering equivalence works as expected
+ (vec![(col_d, option_asc)], true),
+ (vec![(col_d, option_asc), (col_b, option_asc)], true),
+ (vec![(col_d, option_desc), (col_b, option_asc)], false),
+ (
+ vec![
+ (col_e, option_desc),
+ (col_f, option_asc),
+ (col_g, option_asc),
+ ],
+ true,
+ ),
+ (vec![(col_e, option_desc), (col_f, option_asc)], true),
+ (vec![(col_e, option_asc), (col_f, option_asc)], false),
+ (vec![(col_e, option_desc), (col_b, option_asc)], false),
+ (vec![(col_e, option_asc), (col_b, option_asc)], false),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_d, option_asc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_e, option_desc),
+ (col_f, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_e, option_desc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_d, option_desc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_e, option_asc),
+ (col_f, option_asc),
+ ],
+ false,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_b, option_asc),
+ (col_e, option_asc),
+ (col_b, option_asc),
+ ],
+ false,
+ ),
+ (vec![(col_d, option_asc), (col_e, option_desc)], true),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_c, option_asc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_e, option_desc),
+ (col_f, option_asc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_e, option_desc),
+ (col_c, option_asc),
+ (col_b, option_asc),
+ ],
+ true,
+ ),
+ (
+ vec![
+ (col_d, option_asc),
+ (col_e, option_desc),
+ (col_b, option_asc),
+ (col_f, option_asc),
+ ],
+ true,
+ ),
+ ];
+
+ for (cols, expected) in requirements {
+ let err_msg = format!("Error in test case:{cols:?}");
+ let required = cols
+ .into_iter()
+ .map(|(expr, options)| PhysicalSortExpr {
+ expr: Arc::clone(expr),
+ options,
+ })
+ .collect::<Vec<_>>();
+
+ // Check expected result with experimental result.
+ assert_eq!(
+ is_table_same_after_sort(
+ required.clone(),
+ table_data_with_properties.clone()
+ )?,
+ expected
+ );
+ assert_eq!(
+ eq_properties.ordering_satisfy(&required),
+ expected,
+ "{err_msg}"
+ );
+ }
+
+ Ok(())
+}
+
// This test checks given a table is ordered with `[a ASC, b ASC, c ASC, d
ASC]` and `[a ASC, c ASC, b ASC, d ASC]`
// whether the table is also ordered with `[a ASC, b ASC, d ASC]` and `[a ASC,
c ASC, d ASC]`
// Since these orderings cannot be deduced, these orderings shouldn't be
satisfied by the table generated.
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
index ce3afba81e..61691311fe 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -299,6 +299,63 @@ fn get_representative_arr(
None
}
+// Generate a schema which consists of 8 columns (a, b, c, d, e, f, g, h)
+pub fn create_test_schema() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, true);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, true);
+ let e = Field::new("e", DataType::Int32, true);
+ let f = Field::new("f", DataType::Int32, true);
+ let g = Field::new("g", DataType::Int32, true);
+ let h = Field::new("h", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f, g, h]));
+
+ Ok(schema)
+}
+
+/// Construct a schema with following properties
+/// Schema satisfies following orderings:
+/// [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
+/// and
+/// Column [a=c] (e.g they are aliases).
+pub fn create_test_params() -> Result<(SchemaRef, EquivalenceProperties)> {
+ let test_schema = create_test_schema()?;
+ let col_a = &col("a", &test_schema)?;
+ let col_b = &col("b", &test_schema)?;
+ let col_c = &col("c", &test_schema)?;
+ let col_d = &col("d", &test_schema)?;
+ let col_e = &col("e", &test_schema)?;
+ let col_f = &col("f", &test_schema)?;
+ let col_g = &col("g", &test_schema)?;
+ let mut eq_properties =
EquivalenceProperties::new(Arc::clone(&test_schema));
+ eq_properties.add_equal_conditions(col_a, col_c)?;
+
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let option_desc = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let orderings = vec![
+ // [a ASC]
+ vec![(col_a, option_asc)],
+ // [d ASC, b ASC]
+ vec![(col_d, option_asc), (col_b, option_asc)],
+ // [e DESC, f ASC, g ASC]
+ vec![
+ (col_e, option_desc),
+ (col_f, option_asc),
+ (col_g, option_asc),
+ ],
+ ];
+ let orderings = convert_to_orderings(&orderings);
+ eq_properties.add_new_orderings(orderings);
+ Ok((test_schema, eq_properties))
+}
+
// Generate a table that satisfies the given equivalence properties; i.e.
// equivalences, ordering equivalences, and constants.
pub fn generate_table_for_eq_properties(
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index 253f119649..95bb93d6ca 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -77,16 +77,10 @@ mod tests {
use crate::expressions::col;
use crate::PhysicalSortExpr;
- use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field, Schema};
- use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array};
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::{plan_datafusion_err, Result};
- use itertools::izip;
- use rand::rngs::StdRng;
- use rand::{Rng, SeedableRng};
-
pub fn output_schema(
mapping: &ProjectionMapping,
input_schema: &Arc<Schema>,
@@ -290,176 +284,4 @@ mod tests {
Ok(())
}
-
- /// Checks if the table (RecordBatch) remains unchanged when sorted
according to the provided `required_ordering`.
- ///
- /// The function works by adding a unique column of ascending integers to
the original table. This column ensures
- /// that rows that are otherwise indistinguishable (e.g., if they have the
same values in all other columns) can
- /// still be differentiated. When sorting the extended table, the unique
column acts as a tie-breaker to produce
- /// deterministic sorting results.
- ///
- /// If the table remains the same after sorting with the added unique
column, it indicates that the table was
- /// already sorted according to `required_ordering` to begin with.
- pub fn is_table_same_after_sort(
- mut required_ordering: Vec<PhysicalSortExpr>,
- batch: RecordBatch,
- ) -> Result<bool> {
- // Clone the original schema and columns
- let original_schema = batch.schema();
- let mut columns = batch.columns().to_vec();
-
- // Create a new unique column
- let n_row = batch.num_rows();
- let vals: Vec<usize> = (0..n_row).collect::<Vec<_>>();
- let vals: Vec<f64> = vals.into_iter().map(|val| val as f64).collect();
- let unique_col = Arc::new(Float64Array::from_iter_values(vals)) as
ArrayRef;
- columns.push(Arc::clone(&unique_col));
-
- // Create a new schema with the added unique column
- let unique_col_name = "unique";
- let unique_field =
- Arc::new(Field::new(unique_col_name, DataType::Float64, false));
- let fields: Vec<_> = original_schema
- .fields()
- .iter()
- .cloned()
- .chain(std::iter::once(unique_field))
- .collect();
- let schema = Arc::new(Schema::new(fields));
-
- // Create a new batch with the added column
- let new_batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
-
- // Add the unique column to the required ordering to ensure
deterministic results
- required_ordering.push(PhysicalSortExpr {
- expr: Arc::new(Column::new(unique_col_name,
original_schema.fields().len())),
- options: Default::default(),
- });
-
- // Convert the required ordering to a list of SortColumn
- let sort_columns = required_ordering
- .iter()
- .map(|order_expr| {
- let expr_result = order_expr.expr.evaluate(&new_batch)?;
- let values = expr_result.into_array(new_batch.num_rows())?;
- Ok(SortColumn {
- values,
- options: Some(order_expr.options),
- })
- })
- .collect::<Result<Vec<_>>>()?;
-
- // Check if the indices after sorting match the initial ordering
- let sorted_indices = lexsort_to_indices(&sort_columns, None)?;
- let original_indices = UInt32Array::from_iter_values(0..n_row as u32);
-
- Ok(sorted_indices == original_indices)
- }
-
- // If we already generated a random result for one of the
- // expressions in the equivalence classes. For other expressions in the
same
- // equivalence class use same result. This util gets already calculated
result, when available.
- fn get_representative_arr(
- eq_group: &EquivalenceClass,
- existing_vec: &[Option<ArrayRef>],
- schema: SchemaRef,
- ) -> Option<ArrayRef> {
- for expr in eq_group.iter() {
- let col = expr.as_any().downcast_ref::<Column>().unwrap();
- let (idx, _field) = schema.column_with_name(col.name()).unwrap();
- if let Some(res) = &existing_vec[idx] {
- return Some(Arc::clone(res));
- }
- }
- None
- }
-
- // Generate a table that satisfies the given equivalence properties; i.e.
- // equivalences, ordering equivalences, and constants.
- pub fn generate_table_for_eq_properties(
- eq_properties: &EquivalenceProperties,
- n_elem: usize,
- n_distinct: usize,
- ) -> Result<RecordBatch> {
- let mut rng = StdRng::seed_from_u64(23);
-
- let schema = eq_properties.schema();
- let mut schema_vec = vec![None; schema.fields.len()];
-
- // Fill constant columns
- for constant in &eq_properties.constants {
- let col =
constant.expr().as_any().downcast_ref::<Column>().unwrap();
- let (idx, _field) = schema.column_with_name(col.name()).unwrap();
- let arr = Arc::new(Float64Array::from_iter_values(vec![0 as f64;
n_elem]))
- as ArrayRef;
- schema_vec[idx] = Some(arr);
- }
-
- // Fill columns based on ordering equivalences
- for ordering in eq_properties.oeq_class.iter() {
- let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
- .iter()
- .map(|PhysicalSortExpr { expr, options }| {
- let col = expr.as_any().downcast_ref::<Column>().unwrap();
- let (idx, _field) =
schema.column_with_name(col.name()).unwrap();
- let arr = generate_random_f64_array(n_elem, n_distinct,
&mut rng);
- (
- SortColumn {
- values: arr,
- options: Some(*options),
- },
- idx,
- )
- })
- .unzip();
-
- let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
- for (idx, arr) in izip!(indices, sort_arrs) {
- schema_vec[idx] = Some(arr);
- }
- }
-
- // Fill columns based on equivalence groups
- for eq_group in eq_properties.eq_group.iter() {
- let representative_array =
- get_representative_arr(eq_group, &schema_vec,
Arc::clone(schema))
- .unwrap_or_else(|| {
- generate_random_f64_array(n_elem, n_distinct, &mut rng)
- });
-
- for expr in eq_group.iter() {
- let col = expr.as_any().downcast_ref::<Column>().unwrap();
- let (idx, _field) =
schema.column_with_name(col.name()).unwrap();
- schema_vec[idx] = Some(Arc::clone(&representative_array));
- }
- }
-
- let res: Vec<_> = schema_vec
- .into_iter()
- .zip(schema.fields.iter())
- .map(|(elem, field)| {
- (
- field.name(),
- // Generate random values for columns that do not occur in
any of the groups (equivalence, ordering equivalence, constants)
- elem.unwrap_or_else(|| {
- generate_random_f64_array(n_elem, n_distinct, &mut rng)
- }),
- )
- })
- .collect();
-
- Ok(RecordBatch::try_from_iter(res)?)
- }
-
- // Utility function to generate random f64 array
- fn generate_random_f64_array(
- n_elems: usize,
- n_distinct: usize,
- rng: &mut StdRng,
- ) -> ArrayRef {
- let values: Vec<f64> = (0..n_elems)
- .map(|_| rng.gen_range(0..n_distinct) as f64 / 2.0)
- .collect();
- Arc::new(Float64Array::from_iter_values(values))
- }
}
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs
b/datafusion/physical-expr/src/equivalence/ordering.rs
index a3cf8c965b..d71f3b037f 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -254,8 +254,7 @@ mod tests {
use std::sync::Arc;
use crate::equivalence::tests::{
- convert_to_orderings, convert_to_sort_exprs, create_test_params,
- create_test_schema, generate_table_for_eq_properties,
is_table_same_after_sort,
+ convert_to_orderings, convert_to_sort_exprs, create_test_schema,
};
use crate::equivalence::{
EquivalenceClass, EquivalenceGroup, EquivalenceProperties,
@@ -600,174 +599,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_ordering_satisfy_with_equivalence() -> Result<()> {
- // Schema satisfies following orderings:
- // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
- // and
- // Column [a=c] (e.g they are aliases).
- let (test_schema, eq_properties) = create_test_params()?;
- let col_a = &col("a", &test_schema)?;
- let col_b = &col("b", &test_schema)?;
- let col_c = &col("c", &test_schema)?;
- let col_d = &col("d", &test_schema)?;
- let col_e = &col("e", &test_schema)?;
- let col_f = &col("f", &test_schema)?;
- let col_g = &col("g", &test_schema)?;
- let option_asc = SortOptions {
- descending: false,
- nulls_first: false,
- };
- let option_desc = SortOptions {
- descending: true,
- nulls_first: true,
- };
- let table_data_with_properties =
- generate_table_for_eq_properties(&eq_properties, 625, 5)?;
-
- // First element in the tuple stores vector of requirement, second
element is the expected return value for ordering_satisfy function
- let requirements = vec![
- // `a ASC NULLS LAST`, expects `ordering_satisfy` to be `true`,
since existing ordering `a ASC NULLS LAST, b ASC NULLS LAST` satisfies it
- (vec![(col_a, option_asc)], true),
- (vec![(col_a, option_desc)], false),
- // Test whether equivalence works as expected
- (vec![(col_c, option_asc)], true),
- (vec![(col_c, option_desc)], false),
- // Test whether ordering equivalence works as expected
- (vec![(col_d, option_asc)], true),
- (vec![(col_d, option_asc), (col_b, option_asc)], true),
- (vec![(col_d, option_desc), (col_b, option_asc)], false),
- (
- vec![
- (col_e, option_desc),
- (col_f, option_asc),
- (col_g, option_asc),
- ],
- true,
- ),
- (vec![(col_e, option_desc), (col_f, option_asc)], true),
- (vec![(col_e, option_asc), (col_f, option_asc)], false),
- (vec![(col_e, option_desc), (col_b, option_asc)], false),
- (vec![(col_e, option_asc), (col_b, option_asc)], false),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_d, option_asc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_e, option_desc),
- (col_f, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_e, option_desc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_d, option_desc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_e, option_asc),
- (col_f, option_asc),
- ],
- false,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_b, option_asc),
- (col_e, option_asc),
- (col_b, option_asc),
- ],
- false,
- ),
- (vec![(col_d, option_asc), (col_e, option_desc)], true),
- (
- vec![
- (col_d, option_asc),
- (col_c, option_asc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_e, option_desc),
- (col_f, option_asc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_e, option_desc),
- (col_c, option_asc),
- (col_b, option_asc),
- ],
- true,
- ),
- (
- vec![
- (col_d, option_asc),
- (col_e, option_desc),
- (col_b, option_asc),
- (col_f, option_asc),
- ],
- true,
- ),
- ];
-
- for (cols, expected) in requirements {
- let err_msg = format!("Error in test case:{cols:?}");
- let required = cols
- .into_iter()
- .map(|(expr, options)| PhysicalSortExpr {
- expr: Arc::clone(expr),
- options,
- })
- .collect::<Vec<_>>();
-
- // Check expected result with experimental result.
- assert_eq!(
- is_table_same_after_sort(
- required.clone(),
- table_data_with_properties.clone()
- )?,
- expected
- );
- assert_eq!(
- eq_properties.ordering_satisfy(&required),
- expected,
- "{err_msg}"
- );
- }
- Ok(())
- }
-
#[test]
fn test_ordering_satisfy_different_lengths() -> Result<()> {
let test_schema = create_test_schema()?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]