This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a7f0156  Fuzz test for spillable sort (#1706)
a7f0156 is described below

commit a7f0156b33be22c6c3fa66db3754a56844b3c99f
Author: Yijie Shen <[email protected]>
AuthorDate: Sun Jan 30 21:17:47 2022 +0800

    Fuzz test for spillable sort (#1706)
---
 datafusion/Cargo.toml                      |   1 +
 datafusion/fuzz-utils/Cargo.toml           |  28 +++++++
 datafusion/fuzz-utils/src/lib.rs           |  73 +++++++++++++++++
 datafusion/src/execution/memory_manager.rs |   3 +-
 datafusion/src/execution/mod.rs            |   4 +-
 datafusion/src/physical_plan/sorts/sort.rs |   6 +-
 datafusion/tests/merge_fuzz.rs             |  50 +-----------
 datafusion/tests/order_spill_fuzz.rs       | 121 +++++++++++++++++++++++++++++
 8 files changed, 234 insertions(+), 52 deletions(-)

diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index e0e880d..422a776 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -82,6 +82,7 @@ tempfile = "3"
 [dev-dependencies]
 criterion = "0.3"
 doc-comment = "0.3"
+fuzz-utils = { path = "fuzz-utils" }
 
 [[bench]]
 name = "aggregate_query_sql"
diff --git a/datafusion/fuzz-utils/Cargo.toml b/datafusion/fuzz-utils/Cargo.toml
new file mode 100644
index 0000000..304cbfe
--- /dev/null
+++ b/datafusion/fuzz-utils/Cargo.toml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "fuzz-utils"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+arrow = { version = "8.0.0", features = ["prettyprint"] }
+rand = "0.8"
+env_logger = "0.9.0"
diff --git a/datafusion/fuzz-utils/src/lib.rs b/datafusion/fuzz-utils/src/lib.rs
new file mode 100644
index 0000000..e021f55
--- /dev/null
+++ b/datafusion/fuzz-utils/src/lib.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common utils for fuzz tests
+use arrow::{array::Int32Array, record_batch::RecordBatch};
+use rand::prelude::StdRng;
+use rand::Rng;
+
+pub use env_logger;
+
+/// Extracts the i32 values from the set of batches and returns them as a 
single Vec
+pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
+    batches
+        .iter()
+        .map(|batch| {
+            assert_eq!(batch.num_columns(), 1);
+            batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .iter()
+        })
+        .flatten()
+        .collect()
+}
+
+/// extract values from batches and sort them
+pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> 
Vec<Option<i32>> {
+    let mut values: Vec<_> = partitions
+        .iter()
+        .map(|batches| batches_to_vec(batches).into_iter())
+        .flatten()
+        .collect();
+
+    values.sort_unstable();
+    values
+}
+
+/// Adds a random number of empty record batches into the stream
+pub fn add_empty_batches(
+    batches: Vec<RecordBatch>,
+    rng: &mut StdRng,
+) -> Vec<RecordBatch> {
+    let schema = batches[0].schema();
+
+    batches
+        .into_iter()
+        .map(|batch| {
+            // insert 0, or 1 empty batches before and after the current batch
+            let empty_batch = RecordBatch::new_empty(schema.clone());
+            std::iter::repeat(empty_batch.clone())
+                .take(rng.gen_range(0..2))
+                .chain(std::iter::once(batch))
+                
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
+        })
+        .flatten()
+        .collect()
+}
diff --git a/datafusion/src/execution/memory_manager.rs 
b/datafusion/src/execution/memory_manager.rs
index 0fb3cfb..5015f46 100644
--- a/datafusion/src/execution/memory_manager.rs
+++ b/datafusion/src/execution/memory_manager.rs
@@ -392,7 +392,8 @@ const GB: u64 = 1 << 30;
 const MB: u64 = 1 << 20;
 const KB: u64 = 1 << 10;
 
-fn human_readable_size(size: usize) -> String {
+/// Present size in human readable form
+pub fn human_readable_size(size: usize) -> String {
     let size = size as u64;
     let (value, unit) = {
         if size >= 2 * TB {
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index e3b42ae..427c539 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -25,4 +25,6 @@ pub mod options;
 pub mod runtime_env;
 
 pub use disk_manager::DiskManager;
-pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+pub use memory_manager::{
+    human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index 7266b6c..7f7f581 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -21,7 +21,7 @@
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::memory_manager::{
-    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+    human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId, 
MemoryManager,
 };
 use crate::execution::runtime_env::RuntimeEnv;
 use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
@@ -348,7 +348,9 @@ fn write_sorted(
     writer.finish()?;
     debug!(
         "Spilled {} batches of total {} rows to disk, memory released {}",
-        writer.num_batches, writer.num_rows, writer.num_bytes
+        writer.num_batches,
+        writer.num_rows,
+        human_readable_size(writer.num_bytes as usize),
     );
     Ok(())
 }
diff --git a/datafusion/tests/merge_fuzz.rs b/datafusion/tests/merge_fuzz.rs
index 8192054..6821c6b 100644
--- a/datafusion/tests/merge_fuzz.rs
+++ b/datafusion/tests/merge_fuzz.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Fuzz Test for various corner cases merging streams of RecordBatchs
+//! Fuzz Test for various corner cases merging streams of RecordBatches
 use std::sync::Arc;
 
 use arrow::{
@@ -32,6 +32,7 @@ use datafusion::{
         sorts::sort_preserving_merge::SortPreservingMergeExec,
     },
 };
+use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
 use rand::{prelude::StdRng, Rng, SeedableRng};
 
 #[tokio::test]
@@ -147,35 +148,6 @@ async fn run_merge_test(input: Vec<Vec<RecordBatch>>) {
     }
 }
 
-/// Extracts the i32 values from the set of batches and returns them as a 
single Vec
-fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
-    batches
-        .iter()
-        .map(|batch| {
-            assert_eq!(batch.num_columns(), 1);
-            batch
-                .column(0)
-                .as_any()
-                .downcast_ref::<Int32Array>()
-                .unwrap()
-                .iter()
-        })
-        .flatten()
-        .collect()
-}
-
-// extract values from batches and sort them
-fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> 
Vec<Option<i32>> {
-    let mut values: Vec<_> = partitions
-        .iter()
-        .map(|batches| batches_to_vec(batches).into_iter())
-        .flatten()
-        .collect();
-
-    values.sort_unstable();
-    values
-}
-
 /// Return the values `low..high` in order, in randomly sized
 /// record batches in a field named 'x' of type `Int32`
 fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
@@ -199,24 +171,6 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64) 
-> Vec<RecordBatch> {
     add_empty_batches(batches, &mut rng)
 }
 
-/// Adds a random number of empty record batches into the stream
-fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) -> 
Vec<RecordBatch> {
-    let schema = batches[0].schema();
-
-    batches
-        .into_iter()
-        .map(|batch| {
-            // insert 0, or 1 empty batches before and after the current batch
-            let empty_batch = RecordBatch::new_empty(schema.clone());
-            std::iter::repeat(empty_batch.clone())
-                .take(rng.gen_range(0..2))
-                .chain(std::iter::once(batch))
-                
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
-        })
-        .flatten()
-        .collect()
-}
-
 fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
     v1.extend(v2);
     v1
diff --git a/datafusion/tests/order_spill_fuzz.rs 
b/datafusion/tests/order_spill_fuzz.rs
new file mode 100644
index 0000000..049fe6a
--- /dev/null
+++ b/datafusion/tests/order_spill_fuzz.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Fuzz Test for various corner cases sorting RecordBatches exceeds available 
memory and should spill
+
+use arrow::{
+    array::{ArrayRef, Int32Array},
+    compute::SortOptions,
+    record_batch::RecordBatch,
+};
+use datafusion::execution::memory_manager::MemoryManagerConfig;
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::sync::Arc;
+
+#[tokio::test]
+async fn test_sort_1k_mem() {
+    run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await
+}
+
+#[tokio::test]
+async fn test_sort_100k_mem() {
+    run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await
+}
+
+#[tokio::test]
+async fn test_sort_unlimited_mem() {
+    run_sort(
+        usize::MAX,
+        vec![(5, false), (2000, false), (1000000, false)],
+    )
+    .await
+}
+
+/// Sort the input using SortExec and ensure the results are correct according 
to `Vec::sort`
+async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
+    for (size, spill) in size_spill {
+        let input = vec![make_staggered_batches(size)];
+        let first_batch = input
+            .iter()
+            .map(|p| p.iter())
+            .flatten()
+            .next()
+            .expect("at least one batch");
+        let schema = first_batch.schema();
+
+        let sort = vec![PhysicalSortExpr {
+            expr: col("x", &schema).unwrap(),
+            options: SortOptions {
+                descending: false,
+                nulls_first: true,
+            },
+        }];
+
+        let exec = MemoryExec::try_new(&input, schema, None).unwrap();
+        let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec)).unwrap());
+
+        let runtime_config = RuntimeConfig::new().with_memory_manager(
+            MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(),
+        );
+        let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
+        let collected = collect(sort.clone(), runtime).await.unwrap();
+
+        let expected = partitions_to_sorted_vec(&input);
+        let actual = batches_to_vec(&collected);
+
+        if spill {
+            assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+        } else {
+            assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+        }
+
+        assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
+    }
+}
+
+/// Return randomly sized record batches in a field named 'x' of type `Int32`
+/// with randomized i32 content
+fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
+    let mut rng = rand::thread_rng();
+    let mut input: Vec<i32> = vec![0; len];
+    rng.fill(&mut input[..]);
+    let input = Int32Array::from_iter_values(input.into_iter());
+
+    // split into several record batches
+    let mut remainder =
+        RecordBatch::try_from_iter(vec![("x", Arc::new(input) as 
ArrayRef)]).unwrap();
+
+    let mut batches = vec![];
+
+    // use a random number generator to pick a random sized output
+    let mut rng = StdRng::seed_from_u64(42);
+    while remainder.num_rows() > 0 {
+        let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+
+        batches.push(remainder.slice(0, batch_size));
+        remainder = remainder.slice(batch_size, remainder.num_rows() - 
batch_size);
+    }
+
+    add_empty_batches(batches, &mut rng)
+}

Reply via email to