This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dae1efb00e Add multi-column topk fuzz tests (#7898)
dae1efb00e is described below

commit dae1efb00ee47a766261160e8087545551929801
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 22 12:59:49 2023 -0400

    Add multi-column topk fuzz tests (#7898)
    
    * Add multi-column topk tests
    
    * clippy
    
    * fix validation
    
    * Update docs
---
 datafusion/core/tests/fuzz_cases/limit_fuzz.rs | 349 +++++++++++++++++++++++++
 datafusion/core/tests/fuzz_cases/mod.rs        |   2 +
 datafusion/core/tests/fuzz_cases/sort_fuzz.rs  | 221 +---------------
 test-utils/src/lib.rs                          |   2 +-
 4 files changed, 355 insertions(+), 219 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
new file mode 100644
index 0000000000..9889ce2ae5
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Fuzz Test for Sort + Fetch/Limit (TopK!)
+
+use arrow::compute::concat_batches;
+use arrow::util::pretty::pretty_format_batches;
+use arrow::{array::Int32Array, record_batch::RecordBatch};
+use arrow_array::{Float64Array, Int64Array, StringArray};
+use arrow_schema::SchemaRef;
+use datafusion::datasource::MemTable;
+use datafusion::prelude::SessionContext;
+use datafusion_common::assert_contains;
+use rand::{thread_rng, Rng};
+use std::sync::Arc;
+use test_utils::stagger_batch;
+
+#[tokio::test]
+async fn test_sort_topk_i32() {
+    run_limit_fuzz_test(SortedData::new_i32).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_f64() {
+    run_limit_fuzz_test(SortedData::new_f64).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_str() {
+    run_limit_fuzz_test(SortedData::new_str).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_i64str() {
+    run_limit_fuzz_test(SortedData::new_i64str).await
+}
+
+/// Run TopK fuzz tests the specified input data with different
+/// different test functions so they can run in parallel)
+async fn run_limit_fuzz_test<F>(make_data: F)
+where
+    F: Fn(usize) -> SortedData,
+{
+    let mut rng = thread_rng();
+    for size in [10, 1_0000, 10_000, 100_000] {
+        let data = make_data(size);
+        // test various limits including some random ones
+        for limit in [1, 3, 7, 17, 10000, rng.gen_range(1..size * 2)] {
+            //  limit can be larger than the number of rows in the input
+            run_limit_test(limit, &data).await;
+        }
+    }
+}
+
+/// The data column(s) to use for the TopK test
+///
+/// Each variants stores the input batches and the expected sorted values
+/// compute the expected output for a given fetch (limit) value.
+#[derive(Debug)]
+enum SortedData {
+    // single Int32 column
+    I32 {
+        batches: Vec<RecordBatch>,
+        sorted: Vec<Option<i32>>,
+    },
+    /// Single Float64 column
+    F64 {
+        batches: Vec<RecordBatch>,
+        sorted: Vec<Option<f64>>,
+    },
+    /// Single sorted String column
+    Str {
+        batches: Vec<RecordBatch>,
+        sorted: Vec<Option<String>>,
+    },
+    /// (i64, string) columns
+    I64Str {
+        batches: Vec<RecordBatch>,
+        sorted: Vec<(Option<i64>, Option<String>)>,
+    },
+}
+
+impl SortedData {
+    /// Create an i32 column of random values, with the specified number of
+    /// rows, sorted the default
+    fn new_i32(size: usize) -> Self {
+        let mut rng = thread_rng();
+        // have some repeats (approximately 1/3 of the values are the same)
+        let max = size as i32 / 3;
+        let data: Vec<Option<i32>> = (0..size)
+            .map(|_| {
+                // no nulls for now
+                Some(rng.gen_range(0..max))
+            })
+            .collect();
+
+        let batches = stagger_batch(int32_batch(data.iter().cloned()));
+
+        let mut sorted = data;
+        sorted.sort_unstable();
+
+        Self::I32 { batches, sorted }
+    }
+
+    /// Create an f64 column of random values, with the specified number of
+    /// rows, sorted the default
+    fn new_f64(size: usize) -> Self {
+        let mut rng = thread_rng();
+        let mut data: Vec<Option<f64>> = (0..size / 3)
+            .map(|_| {
+                // no nulls for now
+                Some(rng.gen_range(0.0..1.0f64))
+            })
+            .collect();
+
+        // have some repeats (approximately 1/3 of the values are the same)
+        while data.len() < size {
+            data.push(data[rng.gen_range(0..data.len())]);
+        }
+
+        let batches = stagger_batch(f64_batch(data.iter().cloned()));
+
+        let mut sorted = data;
+        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+
+        Self::F64 { batches, sorted }
+    }
+
+    /// Create an string column of random values, with the specified number of
+    /// rows, sorted the default
+    fn new_str(size: usize) -> Self {
+        let mut rng = thread_rng();
+        let mut data: Vec<Option<String>> = (0..size / 3)
+            .map(|_| {
+                // no nulls for now
+                Some(get_random_string(16))
+            })
+            .collect();
+
+        // have some repeats (approximately 1/3 of the values are the same)
+        while data.len() < size {
+            data.push(data[rng.gen_range(0..data.len())].clone());
+        }
+
+        let batches = stagger_batch(string_batch(data.iter()));
+
+        let mut sorted = data;
+        sorted.sort_unstable();
+
+        Self::Str { batches, sorted }
+    }
+
+    /// Create two  columns of random values (int64, string), with the 
specified number of
+    /// rows, sorted the default
+    fn new_i64str(size: usize) -> Self {
+        let mut rng = thread_rng();
+
+        // 100 distinct values
+        let strings: Vec<Option<String>> = (0..100)
+            .map(|_| {
+                // no nulls for now
+                Some(get_random_string(16))
+            })
+            .collect();
+
+        // form inputs, with only 10 distinct integer values , to force 
collision checks
+        let data = (0..size)
+            .map(|_| {
+                (
+                    Some(rng.gen_range(0..10)),
+                    strings[rng.gen_range(0..strings.len())].clone(),
+                )
+            })
+            .collect::<Vec<_>>();
+
+        let batches = stagger_batch(i64string_batch(data.iter()));
+
+        let mut sorted = data;
+        sorted.sort_unstable();
+
+        Self::I64Str { batches, sorted }
+    }
+
+    /// Return top top `limit` values as a RecordBatch
+    fn topk_values(&self, limit: usize) -> RecordBatch {
+        match self {
+            Self::I32 { sorted, .. } => 
int32_batch(sorted.iter().take(limit).cloned()),
+            Self::F64 { sorted, .. } => 
f64_batch(sorted.iter().take(limit).cloned()),
+            Self::Str { sorted, .. } => 
string_batch(sorted.iter().take(limit)),
+            Self::I64Str { sorted, .. } => 
i64string_batch(sorted.iter().take(limit)),
+        }
+    }
+
+    /// Return the input data to sort
+    fn batches(&self) -> Vec<RecordBatch> {
+        match self {
+            Self::I32 { batches, .. } => batches.clone(),
+            Self::F64 { batches, .. } => batches.clone(),
+            Self::Str { batches, .. } => batches.clone(),
+            Self::I64Str { batches, .. } => batches.clone(),
+        }
+    }
+
+    /// Return the schema of the input data
+    fn schema(&self) -> SchemaRef {
+        match self {
+            Self::I32 { batches, .. } => batches[0].schema(),
+            Self::F64 { batches, .. } => batches[0].schema(),
+            Self::Str { batches, .. } => batches[0].schema(),
+            Self::I64Str { batches, .. } => batches[0].schema(),
+        }
+    }
+
+    /// Return the sort expression to use for this data, depending on the type
+    fn sort_expr(&self) -> Vec<datafusion_expr::Expr> {
+        match self {
+            Self::I32 { .. } | Self::F64 { .. } | Self::Str { .. } => {
+                vec![datafusion_expr::col("x").sort(true, true)]
+            }
+            Self::I64Str { .. } => {
+                vec![
+                    datafusion_expr::col("x").sort(true, true),
+                    datafusion_expr::col("y").sort(true, true),
+                ]
+            }
+        }
+    }
+}
+
+/// Create a record batch with a single column of type `Int32` named "x"
+fn int32_batch(values: impl IntoIterator<Item = Option<i32>>) -> RecordBatch {
+    RecordBatch::try_from_iter(vec![(
+        "x",
+        Arc::new(Int32Array::from_iter(values.into_iter())) as _,
+    )])
+    .unwrap()
+}
+
+/// Create a record batch with a single column of type `Float64` named "x"
+fn f64_batch(values: impl IntoIterator<Item = Option<f64>>) -> RecordBatch {
+    RecordBatch::try_from_iter(vec![(
+        "x",
+        Arc::new(Float64Array::from_iter(values.into_iter())) as _,
+    )])
+    .unwrap()
+}
+
+/// Create a record batch with a single column of type `StringArray` named "x"
+fn string_batch<'a>(values: impl IntoIterator<Item = &'a Option<String>>) -> 
RecordBatch {
+    RecordBatch::try_from_iter(vec![(
+        "x",
+        Arc::new(StringArray::from_iter(values.into_iter())) as _,
+    )])
+    .unwrap()
+}
+
+/// Create a record batch with i64 column "x" and utf8 column "y"
+fn i64string_batch<'a>(
+    values: impl IntoIterator<Item = &'a (Option<i64>, Option<String>)> + 
Clone,
+) -> RecordBatch {
+    let ints = values.clone().into_iter().map(|(i, _)| *i);
+    let strings = values.into_iter().map(|(_, s)| s);
+    RecordBatch::try_from_iter(vec![
+        ("x", Arc::new(Int64Array::from_iter(ints)) as _),
+        ("y", Arc::new(StringArray::from_iter(strings)) as _),
+    ])
+    .unwrap()
+}
+
+/// Run the TopK test, sorting the input batches with the specified ftch
+/// (limit) and compares the results to the expected values.
+async fn run_limit_test(fetch: usize, data: &SortedData) {
+    let input = data.batches();
+    let schema = data.schema();
+
+    let table = MemTable::try_new(schema, vec![input]).unwrap();
+
+    let ctx = SessionContext::new();
+    let df = ctx
+        .read_table(Arc::new(table))
+        .unwrap()
+        .sort(data.sort_expr())
+        .unwrap()
+        .limit(0, Some(fetch))
+        .unwrap();
+
+    // Verify the plan contains a TopK node
+    {
+        let explain = df
+            .clone()
+            .explain(false, false)
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+        let plan_text = pretty_format_batches(&explain).unwrap().to_string();
+        let expected = format!("TopK(fetch={fetch})");
+        assert_contains!(plan_text, expected);
+    }
+
+    let results = df.collect().await.unwrap();
+    let expected = data.topk_values(fetch);
+
+    // Verify that all output batches conform to the specified batch size
+    let max_batch_size = ctx.copied_config().batch_size();
+    for batch in &results {
+        assert!(batch.num_rows() <= max_batch_size);
+    }
+
+    let results = concat_batches(&results[0].schema(), &results).unwrap();
+
+    let results = [results];
+    let expected = [expected];
+
+    assert_eq!(
+        &expected,
+        &results,
+        "TopK mismatch fetch {fetch} \n\
+                expected rows {}, actual rows {}.\
+                \n\nExpected:\n{}\n\nActual:\n{}",
+        expected[0].num_rows(),
+        results[0].num_rows(),
+        pretty_format_batches(&expected).unwrap(),
+        pretty_format_batches(&results).unwrap(),
+    );
+}
+
+/// Return random ASCII String with len
+fn get_random_string(len: usize) -> String {
+    rand::thread_rng()
+        .sample_iter(rand::distributions::Alphanumeric)
+        .take(len)
+        .map(char::from)
+        .collect()
+}
diff --git a/datafusion/core/tests/fuzz_cases/mod.rs 
b/datafusion/core/tests/fuzz_cases/mod.rs
index 140cf7e5c7..83ec928ae2 100644
--- a/datafusion/core/tests/fuzz_cases/mod.rs
+++ b/datafusion/core/tests/fuzz_cases/mod.rs
@@ -19,5 +19,7 @@ mod aggregate_fuzz;
 mod join_fuzz;
 mod merge_fuzz;
 mod sort_fuzz;
+
+mod limit_fuzz;
 mod sort_preserving_repartition_fuzz;
 mod window_fuzz;
diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index 2615abfd3c..f4b4f16aa1 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -22,25 +22,17 @@ use arrow::{
     compute::SortOptions,
     record_batch::RecordBatch,
 };
-use arrow_array::{Float64Array, StringArray};
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion::{
-    datasource::MemTable,
-    execution::runtime_env::{RuntimeConfig, RuntimeEnv},
-};
-use datafusion_common::{
-    cast::{as_float64_array, as_string_array},
-    TableReference,
-};
 use datafusion_execution::memory_pool::GreedyMemoryPool;
 use datafusion_physical_expr::expressions::col;
-use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand::Rng;
 use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec};
 
 const KB: usize = 1 << 10;
 #[tokio::test]
@@ -80,44 +72,6 @@ async fn test_sort_unlimited_mem() {
             .await;
     }
 }
-
-#[tokio::test]
-async fn test_sort_topk() {
-    for size in [10, 100, 1000, 10000, 1000000] {
-        let mut topk_scenario = TopKScenario::new()
-            .with_limit(10)
-            .with_table_name("t")
-            .with_col_name("x");
-
-        // test topk with i32
-        let collected_i32 = SortTest::new()
-            .with_input(topk_scenario.batches(size, ColType::I32))
-            .run_with_limit(&topk_scenario)
-            .await;
-        let actual = batches_to_vec(&collected_i32);
-        let excepted_i32 = topk_scenario.excepted_i32();
-        assert_eq!(actual, excepted_i32);
-
-        // test topk with f64
-        let collected_f64 = SortTest::new()
-            .with_input(topk_scenario.batches(size, ColType::F64))
-            .run_with_limit(&topk_scenario)
-            .await;
-        let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
-        let excepted_f64 = topk_scenario.excepted_f64();
-        assert_eq!(actual, excepted_f64);
-
-        // test topk with str
-        let collected_str = SortTest::new()
-            .with_input(topk_scenario.batches(size, ColType::Str))
-            .run_with_limit(&topk_scenario)
-            .await;
-        let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
-        let excepted_str = topk_scenario.excepted_str();
-        assert_eq!(actual, excepted_str);
-    }
-}
-
 #[derive(Debug, Default)]
 struct SortTest {
     input: Vec<Vec<RecordBatch>>,
@@ -132,11 +86,6 @@ impl SortTest {
         Default::default()
     }
 
-    fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
-        self.input = batches.clone();
-        self
-    }
-
     /// Create batches of int32 values of rows
     fn with_int32_batches(mut self, rows: usize) -> Self {
         self.input = vec![make_staggered_i32_batches(rows)];
@@ -154,44 +103,6 @@ impl SortTest {
         self
     }
 
-    async fn run_with_limit<'a>(
-        &self,
-        topk_scenario: &TopKScenario<'a>,
-    ) -> Vec<RecordBatch> {
-        let input = self.input.clone();
-        let schema = input
-            .iter()
-            .flat_map(|p| p.iter())
-            .next()
-            .expect("at least one batch")
-            .schema();
-
-        let table = MemTable::try_new(schema, input.clone()).unwrap();
-
-        let ctx = SessionContext::new();
-
-        ctx.register_table(
-            TableReference::Bare {
-                table: topk_scenario.table_name.into(),
-            },
-            Arc::new(table),
-        )
-        .unwrap();
-
-        let df = ctx
-            .table(topk_scenario.table_name)
-            .await
-            .unwrap()
-            .sort(vec![
-                datafusion_expr::col(topk_scenario.col_name).sort(true, true)
-            ])
-            .unwrap()
-            .limit(0, Some(topk_scenario.limit))
-            .unwrap();
-
-        df.collect().await.unwrap()
-    }
-
     /// Sort the input using SortExec and ensure the results are
     /// correct according to `Vec::sort` both with and without spilling
     async fn run(&self) {
@@ -262,109 +173,6 @@ impl SortTest {
     }
 }
 
-enum ColType {
-    I32,
-    F64,
-    Str,
-}
-
-struct TopKScenario<'a> {
-    limit: usize,
-    batches: Vec<Vec<RecordBatch>>,
-    table_name: &'a str,
-    col_name: &'a str,
-}
-
-impl<'a> TopKScenario<'a> {
-    fn new() -> Self {
-        TopKScenario {
-            limit: 0,
-            batches: vec![],
-            table_name: "",
-            col_name: "",
-        }
-    }
-
-    fn with_limit(mut self, limit: usize) -> Self {
-        self.limit = limit;
-        self
-    }
-
-    fn with_table_name(mut self, table_name: &'a str) -> Self {
-        self.table_name = table_name;
-        self
-    }
-
-    fn with_col_name(mut self, col_name: &'a str) -> Self {
-        self.col_name = col_name;
-        self
-    }
-
-    fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
-        let batches = match t {
-            ColType::I32 => make_staggered_i32_batches(len),
-            ColType::F64 => make_staggered_f64_batches(len),
-            ColType::Str => make_staggered_str_batches(len),
-        };
-        self.batches = vec![batches];
-        self.batches.clone()
-    }
-
-    fn excepted_i32(&self) -> Vec<Option<i32>> {
-        let excepted = partitions_to_sorted_vec(&self.batches);
-        excepted[0..self.limit].into()
-    }
-
-    fn excepted_f64(&self) -> Vec<Option<f64>> {
-        let mut excepted: Vec<Option<f64>> = self
-            .batches
-            .iter()
-            .flat_map(|batches| batches_to_f64_vec(batches).into_iter())
-            .collect();
-        excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
-        excepted[0..self.limit].into()
-    }
-
-    fn excepted_str(&self) -> Vec<Option<&str>> {
-        let mut excepted: Vec<Option<&str>> = self
-            .batches
-            .iter()
-            .flat_map(|batches| batches_to_str_vec(batches).into_iter())
-            .collect();
-        excepted.sort_unstable();
-        excepted[0..self.limit].into()
-    }
-}
-
-impl Default for TopKScenario<'_> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
-    let mut rng = StdRng::seed_from_u64(100);
-    let remainder = RecordBatch::try_from_iter(vec![(
-        "x",
-        Arc::new(Float64Array::from_iter_values(
-            (0..len).map(|_| rng.gen_range(0.0..1000.7)),
-        )) as ArrayRef,
-    )])
-    .unwrap();
-    stagger_batch(remainder)
-}
-
-fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
-    let remainder = RecordBatch::try_from_iter(vec![(
-        "x",
-        Arc::new(StringArray::from_iter_values(
-            (0..len).map(|_| get_random_string(6)),
-        )) as ArrayRef,
-    )])
-    .unwrap();
-    stagger_batch(remainder)
-}
-
 /// Return randomly sized record batches in a field named 'x' of type `Int32`
 /// with randomized i32 content
 fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
@@ -389,26 +197,3 @@ fn make_staggered_i32_batches(len: usize) -> 
Vec<RecordBatch> {
     }
     batches
 }
-
-/// Return random ASCII String with len
-fn get_random_string(len: usize) -> String {
-    rand::thread_rng()
-        .sample_iter(rand::distributions::Alphanumeric)
-        .take(len)
-        .map(char::from)
-        .collect()
-}
-
-fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
-    batches
-        .iter()
-        .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
-        .collect()
-}
-
-fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
-    batches
-        .iter()
-        .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
-        .collect()
-}
diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs
index e3c96d16ee..0c3668d2f8 100644
--- a/test-utils/src/lib.rs
+++ b/test-utils/src/lib.rs
@@ -38,7 +38,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> 
Vec<Option<i32>> {
         .collect()
 }
 
-/// extract values from batches and sort them
+/// extract i32 values from batches and sort them
 pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> 
Vec<Option<i32>> {
     let mut values: Vec<_> = partitions
         .iter()

Reply via email to