This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new dae1efb00e Add multi-column topk fuzz tests (#7898)
dae1efb00e is described below
commit dae1efb00ee47a766261160e8087545551929801
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Oct 22 12:59:49 2023 -0400
Add multi-column topk fuzz tests (#7898)
* Add multi-column topk tests
* clippy
* fix validation
* Update docs
---
datafusion/core/tests/fuzz_cases/limit_fuzz.rs | 349 +++++++++++++++++++++++++
datafusion/core/tests/fuzz_cases/mod.rs | 2 +
datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 221 +---------------
test-utils/src/lib.rs | 2 +-
4 files changed, 355 insertions(+), 219 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
new file mode 100644
index 0000000000..9889ce2ae5
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/limit_fuzz.rs
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Fuzz Test for Sort + Fetch/Limit (TopK!)
+
+use arrow::compute::concat_batches;
+use arrow::util::pretty::pretty_format_batches;
+use arrow::{array::Int32Array, record_batch::RecordBatch};
+use arrow_array::{Float64Array, Int64Array, StringArray};
+use arrow_schema::SchemaRef;
+use datafusion::datasource::MemTable;
+use datafusion::prelude::SessionContext;
+use datafusion_common::assert_contains;
+use rand::{thread_rng, Rng};
+use std::sync::Arc;
+use test_utils::stagger_batch;
+
+#[tokio::test]
+async fn test_sort_topk_i32() {
+ run_limit_fuzz_test(SortedData::new_i32).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_f64() {
+ run_limit_fuzz_test(SortedData::new_f64).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_str() {
+ run_limit_fuzz_test(SortedData::new_str).await
+}
+
+#[tokio::test]
+async fn test_sort_topk_i64str() {
+ run_limit_fuzz_test(SortedData::new_i64str).await
+}
+
+/// Run TopK fuzz tests the specified input data with different
+/// different test functions so they can run in parallel)
+async fn run_limit_fuzz_test<F>(make_data: F)
+where
+ F: Fn(usize) -> SortedData,
+{
+ let mut rng = thread_rng();
+ for size in [10, 1_0000, 10_000, 100_000] {
+ let data = make_data(size);
+ // test various limits including some random ones
+ for limit in [1, 3, 7, 17, 10000, rng.gen_range(1..size * 2)] {
+ // limit can be larger than the number of rows in the input
+ run_limit_test(limit, &data).await;
+ }
+ }
+}
+
+/// The data column(s) to use for the TopK test
+///
+/// Each variants stores the input batches and the expected sorted values
+/// compute the expected output for a given fetch (limit) value.
+#[derive(Debug)]
+enum SortedData {
+ // single Int32 column
+ I32 {
+ batches: Vec<RecordBatch>,
+ sorted: Vec<Option<i32>>,
+ },
+ /// Single Float64 column
+ F64 {
+ batches: Vec<RecordBatch>,
+ sorted: Vec<Option<f64>>,
+ },
+ /// Single sorted String column
+ Str {
+ batches: Vec<RecordBatch>,
+ sorted: Vec<Option<String>>,
+ },
+ /// (i64, string) columns
+ I64Str {
+ batches: Vec<RecordBatch>,
+ sorted: Vec<(Option<i64>, Option<String>)>,
+ },
+}
+
+impl SortedData {
+ /// Create an i32 column of random values, with the specified number of
+ /// rows, sorted the default
+ fn new_i32(size: usize) -> Self {
+ let mut rng = thread_rng();
+ // have some repeats (approximately 1/3 of the values are the same)
+ let max = size as i32 / 3;
+ let data: Vec<Option<i32>> = (0..size)
+ .map(|_| {
+ // no nulls for now
+ Some(rng.gen_range(0..max))
+ })
+ .collect();
+
+ let batches = stagger_batch(int32_batch(data.iter().cloned()));
+
+ let mut sorted = data;
+ sorted.sort_unstable();
+
+ Self::I32 { batches, sorted }
+ }
+
+ /// Create an f64 column of random values, with the specified number of
+ /// rows, sorted the default
+ fn new_f64(size: usize) -> Self {
+ let mut rng = thread_rng();
+ let mut data: Vec<Option<f64>> = (0..size / 3)
+ .map(|_| {
+ // no nulls for now
+ Some(rng.gen_range(0.0..1.0f64))
+ })
+ .collect();
+
+ // have some repeats (approximately 1/3 of the values are the same)
+ while data.len() < size {
+ data.push(data[rng.gen_range(0..data.len())]);
+ }
+
+ let batches = stagger_batch(f64_batch(data.iter().cloned()));
+
+ let mut sorted = data;
+ sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
+
+ Self::F64 { batches, sorted }
+ }
+
+ /// Create an string column of random values, with the specified number of
+ /// rows, sorted the default
+ fn new_str(size: usize) -> Self {
+ let mut rng = thread_rng();
+ let mut data: Vec<Option<String>> = (0..size / 3)
+ .map(|_| {
+ // no nulls for now
+ Some(get_random_string(16))
+ })
+ .collect();
+
+ // have some repeats (approximately 1/3 of the values are the same)
+ while data.len() < size {
+ data.push(data[rng.gen_range(0..data.len())].clone());
+ }
+
+ let batches = stagger_batch(string_batch(data.iter()));
+
+ let mut sorted = data;
+ sorted.sort_unstable();
+
+ Self::Str { batches, sorted }
+ }
+
+ /// Create two columns of random values (int64, string), with the
specified number of
+ /// rows, sorted the default
+ fn new_i64str(size: usize) -> Self {
+ let mut rng = thread_rng();
+
+ // 100 distinct values
+ let strings: Vec<Option<String>> = (0..100)
+ .map(|_| {
+ // no nulls for now
+ Some(get_random_string(16))
+ })
+ .collect();
+
+ // form inputs, with only 10 distinct integer values , to force
collision checks
+ let data = (0..size)
+ .map(|_| {
+ (
+ Some(rng.gen_range(0..10)),
+ strings[rng.gen_range(0..strings.len())].clone(),
+ )
+ })
+ .collect::<Vec<_>>();
+
+ let batches = stagger_batch(i64string_batch(data.iter()));
+
+ let mut sorted = data;
+ sorted.sort_unstable();
+
+ Self::I64Str { batches, sorted }
+ }
+
+ /// Return top top `limit` values as a RecordBatch
+ fn topk_values(&self, limit: usize) -> RecordBatch {
+ match self {
+ Self::I32 { sorted, .. } =>
int32_batch(sorted.iter().take(limit).cloned()),
+ Self::F64 { sorted, .. } =>
f64_batch(sorted.iter().take(limit).cloned()),
+ Self::Str { sorted, .. } =>
string_batch(sorted.iter().take(limit)),
+ Self::I64Str { sorted, .. } =>
i64string_batch(sorted.iter().take(limit)),
+ }
+ }
+
+ /// Return the input data to sort
+ fn batches(&self) -> Vec<RecordBatch> {
+ match self {
+ Self::I32 { batches, .. } => batches.clone(),
+ Self::F64 { batches, .. } => batches.clone(),
+ Self::Str { batches, .. } => batches.clone(),
+ Self::I64Str { batches, .. } => batches.clone(),
+ }
+ }
+
+ /// Return the schema of the input data
+ fn schema(&self) -> SchemaRef {
+ match self {
+ Self::I32 { batches, .. } => batches[0].schema(),
+ Self::F64 { batches, .. } => batches[0].schema(),
+ Self::Str { batches, .. } => batches[0].schema(),
+ Self::I64Str { batches, .. } => batches[0].schema(),
+ }
+ }
+
+ /// Return the sort expression to use for this data, depending on the type
+ fn sort_expr(&self) -> Vec<datafusion_expr::Expr> {
+ match self {
+ Self::I32 { .. } | Self::F64 { .. } | Self::Str { .. } => {
+ vec![datafusion_expr::col("x").sort(true, true)]
+ }
+ Self::I64Str { .. } => {
+ vec![
+ datafusion_expr::col("x").sort(true, true),
+ datafusion_expr::col("y").sort(true, true),
+ ]
+ }
+ }
+ }
+}
+
+/// Create a record batch with a single column of type `Int32` named "x"
+fn int32_batch(values: impl IntoIterator<Item = Option<i32>>) -> RecordBatch {
+ RecordBatch::try_from_iter(vec![(
+ "x",
+ Arc::new(Int32Array::from_iter(values.into_iter())) as _,
+ )])
+ .unwrap()
+}
+
+/// Create a record batch with a single column of type `Float64` named "x"
+fn f64_batch(values: impl IntoIterator<Item = Option<f64>>) -> RecordBatch {
+ RecordBatch::try_from_iter(vec![(
+ "x",
+ Arc::new(Float64Array::from_iter(values.into_iter())) as _,
+ )])
+ .unwrap()
+}
+
+/// Create a record batch with a single column of type `StringArray` named "x"
+fn string_batch<'a>(values: impl IntoIterator<Item = &'a Option<String>>) ->
RecordBatch {
+ RecordBatch::try_from_iter(vec![(
+ "x",
+ Arc::new(StringArray::from_iter(values.into_iter())) as _,
+ )])
+ .unwrap()
+}
+
+/// Create a record batch with i64 column "x" and utf8 column "y"
+fn i64string_batch<'a>(
+ values: impl IntoIterator<Item = &'a (Option<i64>, Option<String>)> +
Clone,
+) -> RecordBatch {
+ let ints = values.clone().into_iter().map(|(i, _)| *i);
+ let strings = values.into_iter().map(|(_, s)| s);
+ RecordBatch::try_from_iter(vec![
+ ("x", Arc::new(Int64Array::from_iter(ints)) as _),
+ ("y", Arc::new(StringArray::from_iter(strings)) as _),
+ ])
+ .unwrap()
+}
+
+/// Run the TopK test, sorting the input batches with the specified ftch
+/// (limit) and compares the results to the expected values.
+async fn run_limit_test(fetch: usize, data: &SortedData) {
+ let input = data.batches();
+ let schema = data.schema();
+
+ let table = MemTable::try_new(schema, vec![input]).unwrap();
+
+ let ctx = SessionContext::new();
+ let df = ctx
+ .read_table(Arc::new(table))
+ .unwrap()
+ .sort(data.sort_expr())
+ .unwrap()
+ .limit(0, Some(fetch))
+ .unwrap();
+
+ // Verify the plan contains a TopK node
+ {
+ let explain = df
+ .clone()
+ .explain(false, false)
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ let plan_text = pretty_format_batches(&explain).unwrap().to_string();
+ let expected = format!("TopK(fetch={fetch})");
+ assert_contains!(plan_text, expected);
+ }
+
+ let results = df.collect().await.unwrap();
+ let expected = data.topk_values(fetch);
+
+ // Verify that all output batches conform to the specified batch size
+ let max_batch_size = ctx.copied_config().batch_size();
+ for batch in &results {
+ assert!(batch.num_rows() <= max_batch_size);
+ }
+
+ let results = concat_batches(&results[0].schema(), &results).unwrap();
+
+ let results = [results];
+ let expected = [expected];
+
+ assert_eq!(
+ &expected,
+ &results,
+ "TopK mismatch fetch {fetch} \n\
+ expected rows {}, actual rows {}.\
+ \n\nExpected:\n{}\n\nActual:\n{}",
+ expected[0].num_rows(),
+ results[0].num_rows(),
+ pretty_format_batches(&expected).unwrap(),
+ pretty_format_batches(&results).unwrap(),
+ );
+}
+
+/// Return random ASCII String with len
+fn get_random_string(len: usize) -> String {
+ rand::thread_rng()
+ .sample_iter(rand::distributions::Alphanumeric)
+ .take(len)
+ .map(char::from)
+ .collect()
+}
diff --git a/datafusion/core/tests/fuzz_cases/mod.rs
b/datafusion/core/tests/fuzz_cases/mod.rs
index 140cf7e5c7..83ec928ae2 100644
--- a/datafusion/core/tests/fuzz_cases/mod.rs
+++ b/datafusion/core/tests/fuzz_cases/mod.rs
@@ -19,5 +19,7 @@ mod aggregate_fuzz;
mod join_fuzz;
mod merge_fuzz;
mod sort_fuzz;
+
+mod limit_fuzz;
mod sort_preserving_repartition_fuzz;
mod window_fuzz;
diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index 2615abfd3c..f4b4f16aa1 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -22,25 +22,17 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
-use arrow_array::{Float64Array, StringArray};
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion::{
- datasource::MemTable,
- execution::runtime_env::{RuntimeConfig, RuntimeEnv},
-};
-use datafusion_common::{
- cast::{as_float64_array, as_string_array},
- TableReference,
-};
use datafusion_execution::memory_pool::GreedyMemoryPool;
use datafusion_physical_expr::expressions::col;
-use rand::{rngs::StdRng, Rng, SeedableRng};
+use rand::Rng;
use std::sync::Arc;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};
+use test_utils::{batches_to_vec, partitions_to_sorted_vec};
const KB: usize = 1 << 10;
#[tokio::test]
@@ -80,44 +72,6 @@ async fn test_sort_unlimited_mem() {
.await;
}
}
-
-#[tokio::test]
-async fn test_sort_topk() {
- for size in [10, 100, 1000, 10000, 1000000] {
- let mut topk_scenario = TopKScenario::new()
- .with_limit(10)
- .with_table_name("t")
- .with_col_name("x");
-
- // test topk with i32
- let collected_i32 = SortTest::new()
- .with_input(topk_scenario.batches(size, ColType::I32))
- .run_with_limit(&topk_scenario)
- .await;
- let actual = batches_to_vec(&collected_i32);
- let excepted_i32 = topk_scenario.excepted_i32();
- assert_eq!(actual, excepted_i32);
-
- // test topk with f64
- let collected_f64 = SortTest::new()
- .with_input(topk_scenario.batches(size, ColType::F64))
- .run_with_limit(&topk_scenario)
- .await;
- let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
- let excepted_f64 = topk_scenario.excepted_f64();
- assert_eq!(actual, excepted_f64);
-
- // test topk with str
- let collected_str = SortTest::new()
- .with_input(topk_scenario.batches(size, ColType::Str))
- .run_with_limit(&topk_scenario)
- .await;
- let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
- let excepted_str = topk_scenario.excepted_str();
- assert_eq!(actual, excepted_str);
- }
-}
-
#[derive(Debug, Default)]
struct SortTest {
input: Vec<Vec<RecordBatch>>,
@@ -132,11 +86,6 @@ impl SortTest {
Default::default()
}
- fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
- self.input = batches.clone();
- self
- }
-
/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
@@ -154,44 +103,6 @@ impl SortTest {
self
}
- async fn run_with_limit<'a>(
- &self,
- topk_scenario: &TopKScenario<'a>,
- ) -> Vec<RecordBatch> {
- let input = self.input.clone();
- let schema = input
- .iter()
- .flat_map(|p| p.iter())
- .next()
- .expect("at least one batch")
- .schema();
-
- let table = MemTable::try_new(schema, input.clone()).unwrap();
-
- let ctx = SessionContext::new();
-
- ctx.register_table(
- TableReference::Bare {
- table: topk_scenario.table_name.into(),
- },
- Arc::new(table),
- )
- .unwrap();
-
- let df = ctx
- .table(topk_scenario.table_name)
- .await
- .unwrap()
- .sort(vec![
- datafusion_expr::col(topk_scenario.col_name).sort(true, true)
- ])
- .unwrap()
- .limit(0, Some(topk_scenario.limit))
- .unwrap();
-
- df.collect().await.unwrap()
- }
-
/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
@@ -262,109 +173,6 @@ impl SortTest {
}
}
-enum ColType {
- I32,
- F64,
- Str,
-}
-
-struct TopKScenario<'a> {
- limit: usize,
- batches: Vec<Vec<RecordBatch>>,
- table_name: &'a str,
- col_name: &'a str,
-}
-
-impl<'a> TopKScenario<'a> {
- fn new() -> Self {
- TopKScenario {
- limit: 0,
- batches: vec![],
- table_name: "",
- col_name: "",
- }
- }
-
- fn with_limit(mut self, limit: usize) -> Self {
- self.limit = limit;
- self
- }
-
- fn with_table_name(mut self, table_name: &'a str) -> Self {
- self.table_name = table_name;
- self
- }
-
- fn with_col_name(mut self, col_name: &'a str) -> Self {
- self.col_name = col_name;
- self
- }
-
- fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
- let batches = match t {
- ColType::I32 => make_staggered_i32_batches(len),
- ColType::F64 => make_staggered_f64_batches(len),
- ColType::Str => make_staggered_str_batches(len),
- };
- self.batches = vec![batches];
- self.batches.clone()
- }
-
- fn excepted_i32(&self) -> Vec<Option<i32>> {
- let excepted = partitions_to_sorted_vec(&self.batches);
- excepted[0..self.limit].into()
- }
-
- fn excepted_f64(&self) -> Vec<Option<f64>> {
- let mut excepted: Vec<Option<f64>> = self
- .batches
- .iter()
- .flat_map(|batches| batches_to_f64_vec(batches).into_iter())
- .collect();
- excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
- excepted[0..self.limit].into()
- }
-
- fn excepted_str(&self) -> Vec<Option<&str>> {
- let mut excepted: Vec<Option<&str>> = self
- .batches
- .iter()
- .flat_map(|batches| batches_to_str_vec(batches).into_iter())
- .collect();
- excepted.sort_unstable();
- excepted[0..self.limit].into()
- }
-}
-
-impl Default for TopKScenario<'_> {
- fn default() -> Self {
- Self::new()
- }
-}
-
-fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
- let mut rng = StdRng::seed_from_u64(100);
- let remainder = RecordBatch::try_from_iter(vec![(
- "x",
- Arc::new(Float64Array::from_iter_values(
- (0..len).map(|_| rng.gen_range(0.0..1000.7)),
- )) as ArrayRef,
- )])
- .unwrap();
- stagger_batch(remainder)
-}
-
-fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
- let remainder = RecordBatch::try_from_iter(vec![(
- "x",
- Arc::new(StringArray::from_iter_values(
- (0..len).map(|_| get_random_string(6)),
- )) as ArrayRef,
- )])
- .unwrap();
- stagger_batch(remainder)
-}
-
/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content
fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
@@ -389,26 +197,3 @@ fn make_staggered_i32_batches(len: usize) ->
Vec<RecordBatch> {
}
batches
}
-
-/// Return random ASCII String with len
-fn get_random_string(len: usize) -> String {
- rand::thread_rng()
- .sample_iter(rand::distributions::Alphanumeric)
- .take(len)
- .map(char::from)
- .collect()
-}
-
-fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
- batches
- .iter()
- .flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
- .collect()
-}
-
-fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
- batches
- .iter()
- .flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
- .collect()
-}
diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs
index e3c96d16ee..0c3668d2f8 100644
--- a/test-utils/src/lib.rs
+++ b/test-utils/src/lib.rs
@@ -38,7 +38,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) ->
Vec<Option<i32>> {
.collect()
}
-/// extract values from batches and sort them
+/// extract i32 values from batches and sort them
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) ->
Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()