This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5391c98f7a Move equivalence fuzz testing to fuzz test binary (#12767)
5391c98f7a is described below

commit 5391c98f7a3fda1f8eef994591286b1596033bc5
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 14 08:52:10 2024 -0400

    Move equivalence fuzz testing to fuzz test binary (#12767)
    
    * Move equivalence fuzz testing to fuzz test binary
    
    * fix license
    
    * fixup
---
 .../core/tests/fuzz_cases/{ => equivalence}/mod.rs |  14 +-
 .../core/tests/fuzz_cases/equivalence/ordering.rs  | 160 +++++++
 .../tests/fuzz_cases/equivalence/projection.rs     | 200 +++++++++
 .../tests/fuzz_cases/equivalence/properties.rs     | 105 +++++
 .../core/tests/fuzz_cases/equivalence/utils.rs     | 463 +++++++++++++++++++++
 datafusion/core/tests/fuzz_cases/mod.rs            |   2 +
 datafusion/physical-expr/src/equivalence/mod.rs    |  89 ----
 .../physical-expr/src/equivalence/ordering.rs      | 138 +-----
 .../physical-expr/src/equivalence/projection.rs    | 177 +-------
 .../physical-expr/src/equivalence/properties.rs    |  84 +---
 10 files changed, 940 insertions(+), 492 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/mod.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/mod.rs
similarity index 80%
copy from datafusion/core/tests/fuzz_cases/mod.rs
copy to datafusion/core/tests/fuzz_cases/equivalence/mod.rs
index 5bc36b963c..2f8a38200b 100644
--- a/datafusion/core/tests/fuzz_cases/mod.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/mod.rs
@@ -15,13 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod aggregate_fuzz;
-mod distinct_count_string_fuzz;
-mod join_fuzz;
-mod merge_fuzz;
-mod sort_fuzz;
+//! `EquivalenceProperties` fuzz testing
 
-mod aggregation_fuzzer;
-mod limit_fuzz;
-mod sort_preserving_repartition_fuzz;
-mod window_fuzz;
+mod ordering;
+mod projection;
+mod properties;
+mod utils;
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
new file mode 100644
index 0000000000..b1ee24a7a3
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::fuzz_cases::equivalence::utils::{
+    create_random_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
+    TestScalarUDF,
+};
+use arrow_schema::SortOptions;
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr::{Operator, ScalarUDF};
+use datafusion_physical_expr::expressions::{col, BinaryExpr};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use itertools::Itertools;
+use std::sync::Arc;
+
+#[test]
+fn test_ordering_satisfy_with_equivalence_random() -> Result<()> {
+    const N_RANDOM_SCHEMA: usize = 5;
+    const N_ELEMENTS: usize = 125;
+    const N_DISTINCT: usize = 5;
+    const SORT_OPTIONS: SortOptions = SortOptions {
+        descending: false,
+        nulls_first: false,
+    };
+
+    for seed in 0..N_RANDOM_SCHEMA {
+        // Create a random schema with random properties
+        let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
+        // Generate a data that satisfies properties given
+        let table_data_with_properties =
+            generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+        let col_exprs = [
+            col("a", &test_schema)?,
+            col("b", &test_schema)?,
+            col("c", &test_schema)?,
+            col("d", &test_schema)?,
+            col("e", &test_schema)?,
+            col("f", &test_schema)?,
+        ];
+
+        for n_req in 0..=col_exprs.len() {
+            for exprs in col_exprs.iter().combinations(n_req) {
+                let requirement = exprs
+                    .into_iter()
+                    .map(|expr| PhysicalSortExpr {
+                        expr: Arc::clone(expr),
+                        options: SORT_OPTIONS,
+                    })
+                    .collect::<Vec<_>>();
+                let expected = is_table_same_after_sort(
+                    requirement.clone(),
+                    table_data_with_properties.clone(),
+                )?;
+                let err_msg = format!(
+                    "Error in test case requirement:{:?}, expected: {:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
+                    requirement, expected, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants
+                );
+                // Check whether ordering_satisfy API result and
+                // experimental result matches.
+                assert_eq!(
+                    eq_properties.ordering_satisfy(&requirement),
+                    expected,
+                    "{}",
+                    err_msg
+                );
+            }
+        }
+    }
+
+    Ok(())
+}
+
+#[test]
+fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
+    const N_RANDOM_SCHEMA: usize = 100;
+    const N_ELEMENTS: usize = 125;
+    const N_DISTINCT: usize = 5;
+    const SORT_OPTIONS: SortOptions = SortOptions {
+        descending: false,
+        nulls_first: false,
+    };
+
+    for seed in 0..N_RANDOM_SCHEMA {
+        // Create a random schema with random properties
+        let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
+        // Generate a data that satisfies properties given
+        let table_data_with_properties =
+            generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+
+        let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+        let floor_a = datafusion_physical_expr::udf::create_physical_expr(
+            &test_fun,
+            &[col("a", &test_schema)?],
+            &test_schema,
+            &[],
+            &DFSchema::empty(),
+        )?;
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col("a", &test_schema)?,
+            Operator::Plus,
+            col("b", &test_schema)?,
+        )) as Arc<dyn PhysicalExpr>;
+        let exprs = [
+            col("a", &test_schema)?,
+            col("b", &test_schema)?,
+            col("c", &test_schema)?,
+            col("d", &test_schema)?,
+            col("e", &test_schema)?,
+            col("f", &test_schema)?,
+            floor_a,
+            a_plus_b,
+        ];
+
+        for n_req in 0..=exprs.len() {
+            for exprs in exprs.iter().combinations(n_req) {
+                let requirement = exprs
+                    .into_iter()
+                    .map(|expr| PhysicalSortExpr {
+                        expr: Arc::clone(expr),
+                        options: SORT_OPTIONS,
+                    })
+                    .collect::<Vec<_>>();
+                let expected = is_table_same_after_sort(
+                    requirement.clone(),
+                    table_data_with_properties.clone(),
+                )?;
+                let err_msg = format!(
+                    "Error in test case requirement:{:?}, expected: {:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
+                    requirement, expected, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants
+                );
+                // Check whether ordering_satisfy API result and
+                // experimental result matches.
+
+                assert_eq!(
+                    eq_properties.ordering_satisfy(&requirement),
+                    (expected | false),
+                    "{}",
+                    err_msg
+                );
+            }
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/projection.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/projection.rs
new file mode 100644
index 0000000000..c0c8517a61
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/equivalence/projection.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::fuzz_cases::equivalence::utils::{
+    apply_projection, create_random_schema, generate_table_for_eq_properties,
+    is_table_same_after_sort, TestScalarUDF,
+};
+use arrow_schema::SortOptions;
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr::{Operator, ScalarUDF};
+use datafusion_physical_expr::equivalence::ProjectionMapping;
+use datafusion_physical_expr::expressions::{col, BinaryExpr};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use itertools::Itertools;
+use std::sync::Arc;
+
+#[test]
+fn project_orderings_random() -> Result<()> {
+    const N_RANDOM_SCHEMA: usize = 20;
+    const N_ELEMENTS: usize = 125;
+    const N_DISTINCT: usize = 5;
+
+    for seed in 0..N_RANDOM_SCHEMA {
+        // Create a random schema with random properties
+        let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
+        // Generate a data that satisfies properties given
+        let table_data_with_properties =
+            generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+        // Floor(a)
+        let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+        let floor_a = datafusion_physical_expr::udf::create_physical_expr(
+            &test_fun,
+            &[col("a", &test_schema)?],
+            &test_schema,
+            &[],
+            &DFSchema::empty(),
+        )?;
+        // a + b
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col("a", &test_schema)?,
+            Operator::Plus,
+            col("b", &test_schema)?,
+        )) as Arc<dyn PhysicalExpr>;
+        let proj_exprs = vec![
+            (col("a", &test_schema)?, "a_new"),
+            (col("b", &test_schema)?, "b_new"),
+            (col("c", &test_schema)?, "c_new"),
+            (col("d", &test_schema)?, "d_new"),
+            (col("e", &test_schema)?, "e_new"),
+            (col("f", &test_schema)?, "f_new"),
+            (floor_a, "floor(a)"),
+            (a_plus_b, "a+b"),
+        ];
+
+        for n_req in 0..=proj_exprs.len() {
+            for proj_exprs in proj_exprs.iter().combinations(n_req) {
+                let proj_exprs = proj_exprs
+                    .into_iter()
+                    .map(|(expr, name)| (Arc::clone(expr), name.to_string()))
+                    .collect::<Vec<_>>();
+                let (projected_batch, projected_eq) = apply_projection(
+                    proj_exprs.clone(),
+                    &table_data_with_properties,
+                    &eq_properties,
+                )?;
+
+                // Make sure each ordering after projection is valid.
+                for ordering in projected_eq.oeq_class().iter() {
+                    let err_msg = format!(
+                        "Error in test case ordering:{:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, proj_exprs: {:?}",
+                        ordering, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants, proj_exprs
+                    );
+                    // Since ordered section satisfies schema, we expect
+                    // that result will be same after sort (e.g sort was 
unnecessary).
+                    assert!(
+                        is_table_same_after_sort(
+                            ordering.clone(),
+                            projected_batch.clone(),
+                        )?,
+                        "{}",
+                        err_msg
+                    );
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+#[test]
+fn ordering_satisfy_after_projection_random() -> Result<()> {
+    const N_RANDOM_SCHEMA: usize = 20;
+    const N_ELEMENTS: usize = 125;
+    const N_DISTINCT: usize = 5;
+    const SORT_OPTIONS: SortOptions = SortOptions {
+        descending: false,
+        nulls_first: false,
+    };
+
+    for seed in 0..N_RANDOM_SCHEMA {
+        // Create a random schema with random properties
+        let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
+        // Generate a data that satisfies properties given
+        let table_data_with_properties =
+            generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+        // Floor(a)
+        let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+        let floor_a = datafusion_physical_expr::udf::create_physical_expr(
+            &test_fun,
+            &[col("a", &test_schema)?],
+            &test_schema,
+            &[],
+            &DFSchema::empty(),
+        )?;
+        // a + b
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col("a", &test_schema)?,
+            Operator::Plus,
+            col("b", &test_schema)?,
+        )) as Arc<dyn PhysicalExpr>;
+        let proj_exprs = vec![
+            (col("a", &test_schema)?, "a_new"),
+            (col("b", &test_schema)?, "b_new"),
+            (col("c", &test_schema)?, "c_new"),
+            (col("d", &test_schema)?, "d_new"),
+            (col("e", &test_schema)?, "e_new"),
+            (col("f", &test_schema)?, "f_new"),
+            (floor_a, "floor(a)"),
+            (a_plus_b, "a+b"),
+        ];
+
+        for n_req in 0..=proj_exprs.len() {
+            for proj_exprs in proj_exprs.iter().combinations(n_req) {
+                let proj_exprs = proj_exprs
+                    .into_iter()
+                    .map(|(expr, name)| (Arc::clone(expr), name.to_string()))
+                    .collect::<Vec<_>>();
+                let (projected_batch, projected_eq) = apply_projection(
+                    proj_exprs.clone(),
+                    &table_data_with_properties,
+                    &eq_properties,
+                )?;
+
+                let projection_mapping =
+                    ProjectionMapping::try_new(&proj_exprs, &test_schema)?;
+
+                let projected_exprs = projection_mapping
+                    .iter()
+                    .map(|(_source, target)| Arc::clone(target))
+                    .collect::<Vec<_>>();
+
+                for n_req in 0..=projected_exprs.len() {
+                    for exprs in projected_exprs.iter().combinations(n_req) {
+                        let requirement = exprs
+                            .into_iter()
+                            .map(|expr| PhysicalSortExpr {
+                                expr: Arc::clone(expr),
+                                options: SORT_OPTIONS,
+                            })
+                            .collect::<Vec<_>>();
+                        let expected = is_table_same_after_sort(
+                            requirement.clone(),
+                            projected_batch.clone(),
+                        )?;
+                        let err_msg = format!(
+                            "Error in test case requirement:{:?}, expected: 
{:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, 
projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: 
{:?}",
+                            requirement, expected, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants, projected_eq.oeq_class, 
projected_eq.eq_group, projected_eq.constants, projection_mapping
+                        );
+                        // Check whether ordering_satisfy API result and
+                        // experimental result matches.
+                        assert_eq!(
+                            projected_eq.ordering_satisfy(&requirement),
+                            expected,
+                            "{}",
+                            err_msg
+                        );
+                    }
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/properties.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/properties.rs
new file mode 100644
index 0000000000..e704fcacc3
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/equivalence/properties.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::fuzz_cases::equivalence::utils::{
+    create_random_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
+    TestScalarUDF,
+};
+use datafusion_common::{DFSchema, Result};
+use datafusion_expr::{Operator, ScalarUDF};
+use datafusion_physical_expr::expressions::{col, BinaryExpr};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use itertools::Itertools;
+use std::sync::Arc;
+
+#[test]
+fn test_find_longest_permutation_random() -> Result<()> {
+    const N_RANDOM_SCHEMA: usize = 100;
+    const N_ELEMENTS: usize = 125;
+    const N_DISTINCT: usize = 5;
+
+    for seed in 0..N_RANDOM_SCHEMA {
+        // Create a random schema with random properties
+        let (test_schema, eq_properties) = create_random_schema(seed as u64)?;
+        // Generate a data that satisfies properties given
+        let table_data_with_properties =
+            generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+
+        let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+        let floor_a = datafusion_physical_expr::udf::create_physical_expr(
+            &test_fun,
+            &[col("a", &test_schema)?],
+            &test_schema,
+            &[],
+            &DFSchema::empty(),
+        )?;
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col("a", &test_schema)?,
+            Operator::Plus,
+            col("b", &test_schema)?,
+        )) as Arc<dyn PhysicalExpr>;
+        let exprs = [
+            col("a", &test_schema)?,
+            col("b", &test_schema)?,
+            col("c", &test_schema)?,
+            col("d", &test_schema)?,
+            col("e", &test_schema)?,
+            col("f", &test_schema)?,
+            floor_a,
+            a_plus_b,
+        ];
+
+        for n_req in 0..=exprs.len() {
+            for exprs in exprs.iter().combinations(n_req) {
+                let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
+                let (ordering, indices) = 
eq_properties.find_longest_permutation(&exprs);
+                // Make sure that find_longest_permutation return values are 
consistent
+                let ordering2 = indices
+                    .iter()
+                    .zip(ordering.iter())
+                    .map(|(&idx, sort_expr)| PhysicalSortExpr {
+                        expr: Arc::clone(&exprs[idx]),
+                        options: sort_expr.options,
+                    })
+                    .collect::<Vec<_>>();
+                assert_eq!(
+                    ordering, ordering2,
+                    "indices and lexicographical ordering do not match"
+                );
+
+                let err_msg = format!(
+                    "Error in test case ordering:{:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
+                    ordering, eq_properties.oeq_class, eq_properties.eq_group, 
eq_properties.constants
+                );
+                assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
+                // Since ordered section satisfies schema, we expect
+                // that result will be same after sort (e.g sort was 
unnecessary).
+                assert!(
+                    is_table_same_after_sort(
+                        ordering.clone(),
+                        table_data_with_properties.clone(),
+                    )?,
+                    "{}",
+                    err_msg
+                );
+            }
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
new file mode 100644
index 0000000000..e51dabd643
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -0,0 +1,463 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// use datafusion_physical_expr::expressions::{col, Column};
+use datafusion::physical_plan::expressions::col;
+use datafusion::physical_plan::expressions::Column;
+use datafusion_physical_expr::{ConstExpr, EquivalenceProperties, 
PhysicalSortExpr};
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::compute::{lexsort_to_indices, SortColumn};
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, 
UInt32Array};
+use arrow_schema::{SchemaRef, SortOptions};
+use datafusion_common::{exec_err, plan_datafusion_err, DataFusionError, 
Result};
+
+use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
+use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
+use datafusion_physical_expr::equivalence::{EquivalenceClass, 
ProjectionMapping};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use itertools::izip;
+use rand::prelude::*;
+
+pub fn output_schema(
+    mapping: &ProjectionMapping,
+    input_schema: &Arc<Schema>,
+) -> Result<SchemaRef> {
+    // Calculate output schema
+    let fields: Result<Vec<Field>> = mapping
+        .iter()
+        .map(|(source, target)| {
+            let name = target
+                .as_any()
+                .downcast_ref::<Column>()
+                .ok_or_else(|| plan_datafusion_err!("Expects to have column"))?
+                .name();
+            let field = Field::new(
+                name,
+                source.data_type(input_schema)?,
+                source.nullable(input_schema)?,
+            );
+
+            Ok(field)
+        })
+        .collect();
+
+    let output_schema = Arc::new(Schema::new_with_metadata(
+        fields?,
+        input_schema.metadata().clone(),
+    ));
+
+    Ok(output_schema)
+}
+
+// Generate a schema which consists of 6 columns (a, b, c, d, e, f)
+fn create_test_schema_2() -> Result<SchemaRef> {
+    let a = Field::new("a", DataType::Float64, true);
+    let b = Field::new("b", DataType::Float64, true);
+    let c = Field::new("c", DataType::Float64, true);
+    let d = Field::new("d", DataType::Float64, true);
+    let e = Field::new("e", DataType::Float64, true);
+    let f = Field::new("f", DataType::Float64, true);
+    let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
+
+    Ok(schema)
+}
+
+/// Construct a schema with random ordering
+/// among column a, b, c, d
+/// where
+/// Column [a=f] (e.g they are aliases).
+/// Column e is constant.
+pub fn create_random_schema(seed: u64) -> Result<(SchemaRef, 
EquivalenceProperties)> {
+    let test_schema = create_test_schema_2()?;
+    let col_a = &col("a", &test_schema)?;
+    let col_b = &col("b", &test_schema)?;
+    let col_c = &col("c", &test_schema)?;
+    let col_d = &col("d", &test_schema)?;
+    let col_e = &col("e", &test_schema)?;
+    let col_f = &col("f", &test_schema)?;
+    let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];
+
+    let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&test_schema));
+    // Define a and f are aliases
+    eq_properties.add_equal_conditions(col_a, col_f)?;
+    // Column e has constant value.
+    eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);
+
+    // Randomly order columns for sorting
+    let mut rng = StdRng::seed_from_u64(seed);
+    let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d are 
sorted
+
+    let options_asc = SortOptions {
+        descending: false,
+        nulls_first: false,
+    };
+
+    while !remaining_exprs.is_empty() {
+        let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
+        remaining_exprs.shuffle(&mut rng);
+
+        let ordering = remaining_exprs
+            .drain(0..n_sort_expr)
+            .map(|expr| PhysicalSortExpr {
+                expr: Arc::clone(expr),
+                options: options_asc,
+            })
+            .collect();
+
+        eq_properties.add_new_orderings([ordering]);
+    }
+
+    Ok((test_schema, eq_properties))
+}
+
+// Apply projection to the input_data, return projected equivalence properties 
and record batch
+pub fn apply_projection(
+    proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
+    input_data: &RecordBatch,
+    input_eq_properties: &EquivalenceProperties,
+) -> Result<(RecordBatch, EquivalenceProperties)> {
+    let input_schema = input_data.schema();
+    let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&input_schema)?;
+
+    let output_schema = output_schema(&projection_mapping, &input_schema)?;
+    let num_rows = input_data.num_rows();
+    // Apply projection to the input record batch.
+    let projected_values = projection_mapping
+        .iter()
+        .map(|(source, _target)| 
source.evaluate(input_data)?.into_array(num_rows))
+        .collect::<Result<Vec<_>>>()?;
+    let projected_batch = if projected_values.is_empty() {
+        RecordBatch::new_empty(Arc::clone(&output_schema))
+    } else {
+        RecordBatch::try_new(Arc::clone(&output_schema), projected_values)?
+    };
+
+    let projected_eq = input_eq_properties.project(&projection_mapping, 
output_schema);
+    Ok((projected_batch, projected_eq))
+}
+
+#[test]
+fn add_equal_conditions_test() -> Result<()> {
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Int64, true),
+        Field::new("b", DataType::Int64, true),
+        Field::new("c", DataType::Int64, true),
+        Field::new("x", DataType::Int64, true),
+        Field::new("y", DataType::Int64, true),
+    ]));
+
+    let mut eq_properties = EquivalenceProperties::new(schema);
+    let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
+    let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
+    let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
+    let col_x_expr = Arc::new(Column::new("x", 3)) as Arc<dyn PhysicalExpr>;
+    let col_y_expr = Arc::new(Column::new("y", 4)) as Arc<dyn PhysicalExpr>;
+
+    // a and b are aliases
+    eq_properties.add_equal_conditions(&col_a_expr, &col_b_expr)?;
+    assert_eq!(eq_properties.eq_group().len(), 1);
+
+    // This new entry is redundant, size shouldn't increase
+    eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr)?;
+    assert_eq!(eq_properties.eq_group().len(), 1);
+    let eq_groups = &eq_properties.eq_group().classes[0];
+    assert_eq!(eq_groups.len(), 2);
+    assert!(eq_groups.contains(&col_a_expr));
+    assert!(eq_groups.contains(&col_b_expr));
+
+    // b and c are aliases. Exising equivalence class should expand,
+    // however there shouldn't be any new equivalence class
+    eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr)?;
+    assert_eq!(eq_properties.eq_group().len(), 1);
+    let eq_groups = &eq_properties.eq_group().classes[0];
+    assert_eq!(eq_groups.len(), 3);
+    assert!(eq_groups.contains(&col_a_expr));
+    assert!(eq_groups.contains(&col_b_expr));
+    assert!(eq_groups.contains(&col_c_expr));
+
+    // This is a new set of equality. Hence equivalent class count should be 2.
+    eq_properties.add_equal_conditions(&col_x_expr, &col_y_expr)?;
+    assert_eq!(eq_properties.eq_group().len(), 2);
+
+    // This equality bridges distinct equality sets.
+    // Hence equivalent class count should decrease from 2 to 1.
+    eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr)?;
+    assert_eq!(eq_properties.eq_group().len(), 1);
+    let eq_groups = &eq_properties.eq_group().classes[0];
+    assert_eq!(eq_groups.len(), 5);
+    assert!(eq_groups.contains(&col_a_expr));
+    assert!(eq_groups.contains(&col_b_expr));
+    assert!(eq_groups.contains(&col_c_expr));
+    assert!(eq_groups.contains(&col_x_expr));
+    assert!(eq_groups.contains(&col_y_expr));
+
+    Ok(())
+}
+
+/// Checks if the table (RecordBatch) remains unchanged when sorted according 
to the provided `required_ordering`.
+///
+/// The function works by adding a unique column of ascending integers to the 
original table. This column ensures
+/// that rows that are otherwise indistinguishable (e.g., if they have the 
same values in all other columns) can
+/// still be differentiated. When sorting the extended table, the unique 
column acts as a tie-breaker to produce
+/// deterministic sorting results.
+///
+/// If the table remains the same after sorting with the added unique column, 
it indicates that the table was
+/// already sorted according to `required_ordering` to begin with.
+pub fn is_table_same_after_sort(
+    mut required_ordering: Vec<PhysicalSortExpr>,
+    batch: RecordBatch,
+) -> Result<bool> {
+    // Clone the original schema and columns
+    let original_schema = batch.schema();
+    let mut columns = batch.columns().to_vec();
+
+    // Create a new unique column
+    let n_row = batch.num_rows();
+    let vals: Vec<usize> = (0..n_row).collect::<Vec<_>>();
+    let vals: Vec<f64> = vals.into_iter().map(|val| val as f64).collect();
+    let unique_col = Arc::new(Float64Array::from_iter_values(vals)) as 
ArrayRef;
+    columns.push(Arc::clone(&unique_col));
+
+    // Create a new schema with the added unique column
+    let unique_col_name = "unique";
+    let unique_field = Arc::new(Field::new(unique_col_name, DataType::Float64, 
false));
+    let fields: Vec<_> = original_schema
+        .fields()
+        .iter()
+        .cloned()
+        .chain(std::iter::once(unique_field))
+        .collect();
+    let schema = Arc::new(Schema::new(fields));
+
+    // Create a new batch with the added column
+    let new_batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
+
+    // Add the unique column to the required ordering to ensure deterministic 
results
+    required_ordering.push(PhysicalSortExpr {
+        expr: Arc::new(Column::new(unique_col_name, 
original_schema.fields().len())),
+        options: Default::default(),
+    });
+
+    // Convert the required ordering to a list of SortColumn
+    let sort_columns = required_ordering
+        .iter()
+        .map(|order_expr| {
+            let expr_result = order_expr.expr.evaluate(&new_batch)?;
+            let values = expr_result.into_array(new_batch.num_rows())?;
+            Ok(SortColumn {
+                values,
+                options: Some(order_expr.options),
+            })
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    // Check if the indices after sorting match the initial ordering
+    let sorted_indices = lexsort_to_indices(&sort_columns, None)?;
+    let original_indices = UInt32Array::from_iter_values(0..n_row as u32);
+
+    Ok(sorted_indices == original_indices)
+}
+
+// If we already generated a random result for one of the
+// expressions in the equivalence classes. For other expressions in the same
+// equivalence class use same result. This util gets already calculated 
result, when available.
+fn get_representative_arr(
+    eq_group: &EquivalenceClass,
+    existing_vec: &[Option<ArrayRef>],
+    schema: SchemaRef,
+) -> Option<ArrayRef> {
+    for expr in eq_group.iter() {
+        let col = expr.as_any().downcast_ref::<Column>().unwrap();
+        let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+        if let Some(res) = &existing_vec[idx] {
+            return Some(Arc::clone(res));
+        }
+    }
+    None
+}
+
+// Generate a table that satisfies the given equivalence properties; i.e.
+// equivalences, ordering equivalences, and constants.
+pub fn generate_table_for_eq_properties(
+    eq_properties: &EquivalenceProperties,
+    n_elem: usize,
+    n_distinct: usize,
+) -> Result<RecordBatch> {
+    let mut rng = StdRng::seed_from_u64(23);
+
+    let schema = eq_properties.schema();
+    let mut schema_vec = vec![None; schema.fields.len()];
+
+    // Utility closure to generate random array
+    let mut generate_random_array = |num_elems: usize, max_val: usize| -> 
ArrayRef {
+        let values: Vec<f64> = (0..num_elems)
+            .map(|_| rng.gen_range(0..max_val) as f64 / 2.0)
+            .collect();
+        Arc::new(Float64Array::from_iter_values(values))
+    };
+
+    // Fill constant columns
+    for constant in &eq_properties.constants {
+        let col = constant.expr().as_any().downcast_ref::<Column>().unwrap();
+        let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+        let arr =
+            Arc::new(Float64Array::from_iter_values(vec![0 as f64; n_elem])) 
as ArrayRef;
+        schema_vec[idx] = Some(arr);
+    }
+
+    // Fill columns based on ordering equivalences
+    for ordering in eq_properties.oeq_class.iter() {
+        let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
+            .iter()
+            .map(|PhysicalSortExpr { expr, options }| {
+                let col = expr.as_any().downcast_ref::<Column>().unwrap();
+                let (idx, _field) = 
schema.column_with_name(col.name()).unwrap();
+                let arr = generate_random_array(n_elem, n_distinct);
+                (
+                    SortColumn {
+                        values: arr,
+                        options: Some(*options),
+                    },
+                    idx,
+                )
+            })
+            .unzip();
+
+        let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
+        for (idx, arr) in izip!(indices, sort_arrs) {
+            schema_vec[idx] = Some(arr);
+        }
+    }
+
+    // Fill columns based on equivalence groups
+    for eq_group in eq_properties.eq_group.iter() {
+        let representative_array =
+            get_representative_arr(eq_group, &schema_vec, Arc::clone(schema))
+                .unwrap_or_else(|| generate_random_array(n_elem, n_distinct));
+
+        for expr in eq_group.iter() {
+            let col = expr.as_any().downcast_ref::<Column>().unwrap();
+            let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+            schema_vec[idx] = Some(Arc::clone(&representative_array));
+        }
+    }
+
+    let res: Vec<_> = schema_vec
+        .into_iter()
+        .zip(schema.fields.iter())
+        .map(|(elem, field)| {
+            (
+                field.name(),
+                // Generate random values for columns that do not occur in any 
of the groups (equivalence, ordering equivalence, constants)
+                elem.unwrap_or_else(|| generate_random_array(n_elem, 
n_distinct)),
+            )
+        })
+        .collect();
+
+    Ok(RecordBatch::try_from_iter(res)?)
+}
+
+#[derive(Debug, Clone)]
+pub struct TestScalarUDF {
+    pub(crate) signature: Signature,
+}
+
+impl TestScalarUDF {
+    pub fn new() -> Self {
+        use DataType::*;
+        Self {
+            signature: Signature::uniform(
+                1,
+                vec![Float64, Float32],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for TestScalarUDF {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+    fn name(&self) -> &str {
+        "test-scalar-udf"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        let arg_type = &arg_types[0];
+
+        match arg_type {
+            DataType::Float32 => Ok(DataType::Float32),
+            _ => Ok(DataType::Float64),
+        }
+    }
+
+    fn output_ordering(&self, input: &[ExprProperties]) -> 
Result<SortProperties> {
+        Ok(input[0].sort_properties)
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        let args = ColumnarValue::values_to_arrays(args)?;
+
+        let arr: ArrayRef = match args[0].data_type() {
+            DataType::Float64 => Arc::new({
+                let arg = 
&args[0].as_any().downcast_ref::<Float64Array>().ok_or_else(
+                    || {
+                        DataFusionError::Internal(format!(
+                            "could not cast {} to {}",
+                            self.name(),
+                            std::any::type_name::<Float64Array>()
+                        ))
+                    },
+                )?;
+
+                arg.iter()
+                    .map(|a| a.map(f64::floor))
+                    .collect::<Float64Array>()
+            }),
+            DataType::Float32 => Arc::new({
+                let arg = 
&args[0].as_any().downcast_ref::<Float32Array>().ok_or_else(
+                    || {
+                        DataFusionError::Internal(format!(
+                            "could not cast {} to {}",
+                            self.name(),
+                            std::any::type_name::<Float32Array>()
+                        ))
+                    },
+                )?;
+
+                arg.iter()
+                    .map(|a| a.map(f32::floor))
+                    .collect::<Float32Array>()
+            }),
+            other => {
+                return exec_err!(
+                    "Unsupported data type {other:?} for function {}",
+                    self.name()
+                );
+            }
+        };
+        Ok(ColumnarValue::Array(arr))
+    }
+}
diff --git a/datafusion/core/tests/fuzz_cases/mod.rs 
b/datafusion/core/tests/fuzz_cases/mod.rs
index 5bc36b963c..49db0d31a8 100644
--- a/datafusion/core/tests/fuzz_cases/mod.rs
+++ b/datafusion/core/tests/fuzz_cases/mod.rs
@@ -22,6 +22,8 @@ mod merge_fuzz;
 mod sort_fuzz;
 
 mod aggregation_fuzzer;
+mod equivalence;
+
 mod limit_fuzz;
 mod sort_preserving_repartition_fuzz;
 mod window_fuzz;
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index 38647f7ca1..7726458a46 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -84,7 +84,6 @@ mod tests {
 
     use itertools::izip;
     use rand::rngs::StdRng;
-    use rand::seq::SliceRandom;
     use rand::{Rng, SeedableRng};
 
     pub fn output_schema(
@@ -175,67 +174,6 @@ mod tests {
         Ok((test_schema, eq_properties))
     }
 
-    // Generate a schema which consists of 6 columns (a, b, c, d, e, f)
-    fn create_test_schema_2() -> Result<SchemaRef> {
-        let a = Field::new("a", DataType::Float64, true);
-        let b = Field::new("b", DataType::Float64, true);
-        let c = Field::new("c", DataType::Float64, true);
-        let d = Field::new("d", DataType::Float64, true);
-        let e = Field::new("e", DataType::Float64, true);
-        let f = Field::new("f", DataType::Float64, true);
-        let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
-
-        Ok(schema)
-    }
-
-    /// Construct a schema with random ordering
-    /// among column a, b, c, d
-    /// where
-    /// Column [a=f] (e.g they are aliases).
-    /// Column e is constant.
-    pub fn create_random_schema(seed: u64) -> Result<(SchemaRef, 
EquivalenceProperties)> {
-        let test_schema = create_test_schema_2()?;
-        let col_a = &col("a", &test_schema)?;
-        let col_b = &col("b", &test_schema)?;
-        let col_c = &col("c", &test_schema)?;
-        let col_d = &col("d", &test_schema)?;
-        let col_e = &col("e", &test_schema)?;
-        let col_f = &col("f", &test_schema)?;
-        let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];
-
-        let mut eq_properties = 
EquivalenceProperties::new(Arc::clone(&test_schema));
-        // Define a and f are aliases
-        eq_properties.add_equal_conditions(col_a, col_f)?;
-        // Column e has constant value.
-        eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);
-
-        // Randomly order columns for sorting
-        let mut rng = StdRng::seed_from_u64(seed);
-        let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d 
are sorted
-
-        let options_asc = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-
-        while !remaining_exprs.is_empty() {
-            let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
-            remaining_exprs.shuffle(&mut rng);
-
-            let ordering = remaining_exprs
-                .drain(0..n_sort_expr)
-                .map(|expr| PhysicalSortExpr {
-                    expr: Arc::clone(expr),
-                    options: options_asc,
-                })
-                .collect();
-
-            eq_properties.add_new_orderings([ordering]);
-        }
-
-        Ok((test_schema, eq_properties))
-    }
-
     // Convert each tuple to PhysicalSortRequirement
     pub fn convert_to_sort_reqs(
         in_data: &[(&Arc<dyn PhysicalExpr>, Option<SortOptions>)],
@@ -294,33 +232,6 @@ mod tests {
             .collect()
     }
 
-    // Apply projection to the input_data, return projected equivalence 
properties and record batch
-    pub fn apply_projection(
-        proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
-        input_data: &RecordBatch,
-        input_eq_properties: &EquivalenceProperties,
-    ) -> Result<(RecordBatch, EquivalenceProperties)> {
-        let input_schema = input_data.schema();
-        let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&input_schema)?;
-
-        let output_schema = output_schema(&projection_mapping, &input_schema)?;
-        let num_rows = input_data.num_rows();
-        // Apply projection to the input record batch.
-        let projected_values = projection_mapping
-            .iter()
-            .map(|(source, _target)| 
source.evaluate(input_data)?.into_array(num_rows))
-            .collect::<Result<Vec<_>>>()?;
-        let projected_batch = if projected_values.is_empty() {
-            RecordBatch::new_empty(Arc::clone(&output_schema))
-        } else {
-            RecordBatch::try_new(Arc::clone(&output_schema), projected_values)?
-        };
-
-        let projected_eq =
-            input_eq_properties.project(&projection_mapping, output_schema);
-        Ok((projected_batch, projected_eq))
-    }
-
     #[test]
     fn add_equal_conditions_test() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index bb3e9218bc..a3cf8c965b 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -254,9 +254,8 @@ mod tests {
     use std::sync::Arc;
 
     use crate::equivalence::tests::{
-        convert_to_orderings, convert_to_sort_exprs, create_random_schema,
-        create_test_params, create_test_schema, 
generate_table_for_eq_properties,
-        is_table_same_after_sort,
+        convert_to_orderings, convert_to_sort_exprs, create_test_params,
+        create_test_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
     };
     use crate::equivalence::{
         EquivalenceClass, EquivalenceGroup, EquivalenceProperties,
@@ -271,8 +270,6 @@ mod tests {
     use datafusion_common::{DFSchema, Result};
     use datafusion_expr::{Operator, ScalarUDF};
 
-    use itertools::Itertools;
-
     #[test]
     fn test_ordering_satisfy() -> Result<()> {
         let input_schema = Arc::new(Schema::new(vec![
@@ -771,137 +768,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_ordering_satisfy_with_equivalence_random() -> Result<()> {
-        const N_RANDOM_SCHEMA: usize = 5;
-        const N_ELEMENTS: usize = 125;
-        const N_DISTINCT: usize = 5;
-        const SORT_OPTIONS: SortOptions = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-
-        for seed in 0..N_RANDOM_SCHEMA {
-            // Create a random schema with random properties
-            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
-            // Generate a data that satisfies properties given
-            let table_data_with_properties =
-                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
-            let col_exprs = [
-                col("a", &test_schema)?,
-                col("b", &test_schema)?,
-                col("c", &test_schema)?,
-                col("d", &test_schema)?,
-                col("e", &test_schema)?,
-                col("f", &test_schema)?,
-            ];
-
-            for n_req in 0..=col_exprs.len() {
-                for exprs in col_exprs.iter().combinations(n_req) {
-                    let requirement = exprs
-                        .into_iter()
-                        .map(|expr| PhysicalSortExpr {
-                            expr: Arc::clone(expr),
-                            options: SORT_OPTIONS,
-                        })
-                        .collect::<Vec<_>>();
-                    let expected = is_table_same_after_sort(
-                        requirement.clone(),
-                        table_data_with_properties.clone(),
-                    )?;
-                    let err_msg = format!(
-                        "Error in test case requirement:{:?}, expected: {:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
-                        requirement, expected, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants
-                    );
-                    // Check whether ordering_satisfy API result and
-                    // experimental result matches.
-                    assert_eq!(
-                        eq_properties.ordering_satisfy(&requirement),
-                        expected,
-                        "{}",
-                        err_msg
-                    );
-                }
-            }
-        }
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
-        const N_RANDOM_SCHEMA: usize = 100;
-        const N_ELEMENTS: usize = 125;
-        const N_DISTINCT: usize = 5;
-        const SORT_OPTIONS: SortOptions = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-
-        for seed in 0..N_RANDOM_SCHEMA {
-            // Create a random schema with random properties
-            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
-            // Generate a data that satisfies properties given
-            let table_data_with_properties =
-                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
-
-            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
-            let floor_a = crate::udf::create_physical_expr(
-                &test_fun,
-                &[col("a", &test_schema)?],
-                &test_schema,
-                &[],
-                &DFSchema::empty(),
-            )?;
-            let a_plus_b = Arc::new(BinaryExpr::new(
-                col("a", &test_schema)?,
-                Operator::Plus,
-                col("b", &test_schema)?,
-            )) as Arc<dyn PhysicalExpr>;
-            let exprs = [
-                col("a", &test_schema)?,
-                col("b", &test_schema)?,
-                col("c", &test_schema)?,
-                col("d", &test_schema)?,
-                col("e", &test_schema)?,
-                col("f", &test_schema)?,
-                floor_a,
-                a_plus_b,
-            ];
-
-            for n_req in 0..=exprs.len() {
-                for exprs in exprs.iter().combinations(n_req) {
-                    let requirement = exprs
-                        .into_iter()
-                        .map(|expr| PhysicalSortExpr {
-                            expr: Arc::clone(expr),
-                            options: SORT_OPTIONS,
-                        })
-                        .collect::<Vec<_>>();
-                    let expected = is_table_same_after_sort(
-                        requirement.clone(),
-                        table_data_with_properties.clone(),
-                    )?;
-                    let err_msg = format!(
-                        "Error in test case requirement:{:?}, expected: {:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
-                        requirement, expected, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants
-                    );
-                    // Check whether ordering_satisfy API result and
-                    // experimental result matches.
-
-                    assert_eq!(
-                        eq_properties.ordering_satisfy(&requirement),
-                        (expected | false),
-                        "{}",
-                        err_msg
-                    );
-                }
-            }
-        }
-
-        Ok(())
-    }
-
     #[test]
     fn test_ordering_satisfy_different_lengths() -> Result<()> {
         let test_schema = create_test_schema()?;
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index ebf26d3262..25a05a2a59 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -139,23 +139,18 @@ fn project_index_to_exprs(
 mod tests {
     use super::*;
     use crate::equivalence::tests::{
-        apply_projection, convert_to_orderings, convert_to_orderings_owned,
-        create_random_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
-        output_schema,
+        convert_to_orderings, convert_to_orderings_owned, output_schema,
     };
     use crate::equivalence::EquivalenceProperties;
     use crate::expressions::{col, BinaryExpr};
     use crate::udf::create_physical_expr;
     use crate::utils::tests::TestScalarUDF;
-    use crate::PhysicalSortExpr;
 
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow_schema::{SortOptions, TimeUnit};
     use datafusion_common::DFSchema;
     use datafusion_expr::{Operator, ScalarUDF};
 
-    use itertools::Itertools;
-
     #[test]
     fn project_orderings() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
@@ -987,174 +982,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[test]
-    fn project_orderings_random() -> Result<()> {
-        const N_RANDOM_SCHEMA: usize = 20;
-        const N_ELEMENTS: usize = 125;
-        const N_DISTINCT: usize = 5;
-
-        for seed in 0..N_RANDOM_SCHEMA {
-            // Create a random schema with random properties
-            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
-            // Generate a data that satisfies properties given
-            let table_data_with_properties =
-                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
-            // Floor(a)
-            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
-            let floor_a = create_physical_expr(
-                &test_fun,
-                &[col("a", &test_schema)?],
-                &test_schema,
-                &[],
-                &DFSchema::empty(),
-            )?;
-            // a + b
-            let a_plus_b = Arc::new(BinaryExpr::new(
-                col("a", &test_schema)?,
-                Operator::Plus,
-                col("b", &test_schema)?,
-            )) as Arc<dyn PhysicalExpr>;
-            let proj_exprs = vec![
-                (col("a", &test_schema)?, "a_new"),
-                (col("b", &test_schema)?, "b_new"),
-                (col("c", &test_schema)?, "c_new"),
-                (col("d", &test_schema)?, "d_new"),
-                (col("e", &test_schema)?, "e_new"),
-                (col("f", &test_schema)?, "f_new"),
-                (floor_a, "floor(a)"),
-                (a_plus_b, "a+b"),
-            ];
-
-            for n_req in 0..=proj_exprs.len() {
-                for proj_exprs in proj_exprs.iter().combinations(n_req) {
-                    let proj_exprs = proj_exprs
-                        .into_iter()
-                        .map(|(expr, name)| (Arc::clone(expr), 
name.to_string()))
-                        .collect::<Vec<_>>();
-                    let (projected_batch, projected_eq) = apply_projection(
-                        proj_exprs.clone(),
-                        &table_data_with_properties,
-                        &eq_properties,
-                    )?;
-
-                    // Make sure each ordering after projection is valid.
-                    for ordering in projected_eq.oeq_class().iter() {
-                        let err_msg = format!(
-                            "Error in test case ordering:{:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, proj_exprs: {:?}",
-                            ordering, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants, proj_exprs
-                        );
-                        // Since ordered section satisfies schema, we expect
-                        // that result will be same after sort (e.g sort was 
unnecessary).
-                        assert!(
-                            is_table_same_after_sort(
-                                ordering.clone(),
-                                projected_batch.clone(),
-                            )?,
-                            "{}",
-                            err_msg
-                        );
-                    }
-                }
-            }
-        }
-
-        Ok(())
-    }
-
-    #[test]
-    fn ordering_satisfy_after_projection_random() -> Result<()> {
-        const N_RANDOM_SCHEMA: usize = 20;
-        const N_ELEMENTS: usize = 125;
-        const N_DISTINCT: usize = 5;
-        const SORT_OPTIONS: SortOptions = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-
-        for seed in 0..N_RANDOM_SCHEMA {
-            // Create a random schema with random properties
-            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
-            // Generate a data that satisfies properties given
-            let table_data_with_properties =
-                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
-            // Floor(a)
-            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
-            let floor_a = create_physical_expr(
-                &test_fun,
-                &[col("a", &test_schema)?],
-                &test_schema,
-                &[],
-                &DFSchema::empty(),
-            )?;
-            // a + b
-            let a_plus_b = Arc::new(BinaryExpr::new(
-                col("a", &test_schema)?,
-                Operator::Plus,
-                col("b", &test_schema)?,
-            )) as Arc<dyn PhysicalExpr>;
-            let proj_exprs = vec![
-                (col("a", &test_schema)?, "a_new"),
-                (col("b", &test_schema)?, "b_new"),
-                (col("c", &test_schema)?, "c_new"),
-                (col("d", &test_schema)?, "d_new"),
-                (col("e", &test_schema)?, "e_new"),
-                (col("f", &test_schema)?, "f_new"),
-                (floor_a, "floor(a)"),
-                (a_plus_b, "a+b"),
-            ];
-
-            for n_req in 0..=proj_exprs.len() {
-                for proj_exprs in proj_exprs.iter().combinations(n_req) {
-                    let proj_exprs = proj_exprs
-                        .into_iter()
-                        .map(|(expr, name)| (Arc::clone(expr), 
name.to_string()))
-                        .collect::<Vec<_>>();
-                    let (projected_batch, projected_eq) = apply_projection(
-                        proj_exprs.clone(),
-                        &table_data_with_properties,
-                        &eq_properties,
-                    )?;
-
-                    let projection_mapping =
-                        ProjectionMapping::try_new(&proj_exprs, &test_schema)?;
-
-                    let projected_exprs = projection_mapping
-                        .iter()
-                        .map(|(_source, target)| Arc::clone(target))
-                        .collect::<Vec<_>>();
-
-                    for n_req in 0..=projected_exprs.len() {
-                        for exprs in 
projected_exprs.iter().combinations(n_req) {
-                            let requirement = exprs
-                                .into_iter()
-                                .map(|expr| PhysicalSortExpr {
-                                    expr: Arc::clone(expr),
-                                    options: SORT_OPTIONS,
-                                })
-                                .collect::<Vec<_>>();
-                            let expected = is_table_same_after_sort(
-                                requirement.clone(),
-                                projected_batch.clone(),
-                            )?;
-                            let err_msg = format!(
-                                "Error in test case requirement:{:?}, 
expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, 
projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: 
{:?}",
-                                requirement, expected, 
eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, 
projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants, 
projection_mapping
-                            );
-                            // Check whether ordering_satisfy API result and
-                            // experimental result matches.
-                            assert_eq!(
-                                projected_eq.ordering_satisfy(&requirement),
-                                expected,
-                                "{}",
-                                err_msg
-                            );
-                        }
-                    }
-                }
-            }
-        }
-
-        Ok(())
-    }
 }
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 005e5776d3..a0cc29685f 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -2101,16 +2101,13 @@ mod tests {
     use crate::equivalence::add_offset_to_expr;
     use crate::equivalence::tests::{
         convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
-        create_random_schema, create_test_params, create_test_schema,
-        generate_table_for_eq_properties, is_table_same_after_sort, 
output_schema,
+        create_test_params, create_test_schema, output_schema,
     };
     use crate::expressions::{col, BinaryExpr, Column};
-    use crate::utils::tests::TestScalarUDF;
 
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow_schema::{Fields, TimeUnit};
-    use datafusion_common::DFSchema;
-    use datafusion_expr::{Operator, ScalarUDF};
+    use datafusion_expr::Operator;
 
     #[test]
     fn project_equivalence_properties_test() -> Result<()> {
@@ -2621,83 +2618,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_find_longest_permutation_random() -> Result<()> {
-        const N_RANDOM_SCHEMA: usize = 100;
-        const N_ELEMENTS: usize = 125;
-        const N_DISTINCT: usize = 5;
-
-        for seed in 0..N_RANDOM_SCHEMA {
-            // Create a random schema with random properties
-            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
-            // Generate a data that satisfies properties given
-            let table_data_with_properties =
-                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
-
-            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
-            let floor_a = crate::udf::create_physical_expr(
-                &test_fun,
-                &[col("a", &test_schema)?],
-                &test_schema,
-                &[],
-                &DFSchema::empty(),
-            )?;
-            let a_plus_b = Arc::new(BinaryExpr::new(
-                col("a", &test_schema)?,
-                Operator::Plus,
-                col("b", &test_schema)?,
-            )) as Arc<dyn PhysicalExpr>;
-            let exprs = [
-                col("a", &test_schema)?,
-                col("b", &test_schema)?,
-                col("c", &test_schema)?,
-                col("d", &test_schema)?,
-                col("e", &test_schema)?,
-                col("f", &test_schema)?,
-                floor_a,
-                a_plus_b,
-            ];
-
-            for n_req in 0..=exprs.len() {
-                for exprs in exprs.iter().combinations(n_req) {
-                    let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
-                    let (ordering, indices) =
-                        eq_properties.find_longest_permutation(&exprs);
-                    // Make sure that find_longest_permutation return values 
are consistent
-                    let ordering2 = indices
-                        .iter()
-                        .zip(ordering.iter())
-                        .map(|(&idx, sort_expr)| PhysicalSortExpr {
-                            expr: Arc::clone(&exprs[idx]),
-                            options: sort_expr.options,
-                        })
-                        .collect::<Vec<_>>();
-                    assert_eq!(
-                        ordering, ordering2,
-                        "indices and lexicographical ordering do not match"
-                    );
-
-                    let err_msg = format!(
-                        "Error in test case ordering:{:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}",
-                        ordering, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants
-                    );
-                    assert_eq!(ordering.len(), indices.len(), "{}", err_msg);
-                    // Since ordered section satisfies schema, we expect
-                    // that result will be same after sort (e.g sort was 
unnecessary).
-                    assert!(
-                        is_table_same_after_sort(
-                            ordering.clone(),
-                            table_data_with_properties.clone(),
-                        )?,
-                        "{}",
-                        err_msg
-                    );
-                }
-            }
-        }
-
-        Ok(())
-    }
     #[test]
     fn test_find_longest_permutation() -> Result<()> {
         // Schema satisfies following orderings:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to