This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a7f0156 Fuzz test for spillable sort (#1706)
a7f0156 is described below
commit a7f0156b33be22c6c3fa66db3754a56844b3c99f
Author: Yijie Shen <[email protected]>
AuthorDate: Sun Jan 30 21:17:47 2022 +0800
Fuzz test for spillable sort (#1706)
---
datafusion/Cargo.toml | 1 +
datafusion/fuzz-utils/Cargo.toml | 28 +++++++
datafusion/fuzz-utils/src/lib.rs | 73 +++++++++++++++++
datafusion/src/execution/memory_manager.rs | 3 +-
datafusion/src/execution/mod.rs | 4 +-
datafusion/src/physical_plan/sorts/sort.rs | 6 +-
datafusion/tests/merge_fuzz.rs | 50 +-----------
datafusion/tests/order_spill_fuzz.rs | 121 +++++++++++++++++++++++++++++
8 files changed, 234 insertions(+), 52 deletions(-)
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index e0e880d..422a776 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -82,6 +82,7 @@ tempfile = "3"
[dev-dependencies]
criterion = "0.3"
doc-comment = "0.3"
+fuzz-utils = { path = "fuzz-utils" }
[[bench]]
name = "aggregate_query_sql"
diff --git a/datafusion/fuzz-utils/Cargo.toml b/datafusion/fuzz-utils/Cargo.toml
new file mode 100644
index 0000000..304cbfe
--- /dev/null
+++ b/datafusion/fuzz-utils/Cargo.toml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "fuzz-utils"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+arrow = { version = "8.0.0", features = ["prettyprint"] }
+rand = "0.8"
+env_logger = "0.9.0"
diff --git a/datafusion/fuzz-utils/src/lib.rs b/datafusion/fuzz-utils/src/lib.rs
new file mode 100644
index 0000000..e021f55
--- /dev/null
+++ b/datafusion/fuzz-utils/src/lib.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common utils for fuzz tests
+use arrow::{array::Int32Array, record_batch::RecordBatch};
+use rand::prelude::StdRng;
+use rand::Rng;
+
+pub use env_logger;
+
+/// Extracts the i32 values from the set of batches and returns them as a
single Vec
+pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
+ batches
+ .iter()
+ .map(|batch| {
+ assert_eq!(batch.num_columns(), 1);
+ batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .iter()
+ })
+ .flatten()
+ .collect()
+}
+
+/// extract values from batches and sort them
+pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) ->
Vec<Option<i32>> {
+ let mut values: Vec<_> = partitions
+ .iter()
+ .map(|batches| batches_to_vec(batches).into_iter())
+ .flatten()
+ .collect();
+
+ values.sort_unstable();
+ values
+}
+
+/// Adds a random number of empty record batches into the stream
+pub fn add_empty_batches(
+ batches: Vec<RecordBatch>,
+ rng: &mut StdRng,
+) -> Vec<RecordBatch> {
+ let schema = batches[0].schema();
+
+ batches
+ .into_iter()
+ .map(|batch| {
+ // insert 0, or 1 empty batches before and after the current batch
+ let empty_batch = RecordBatch::new_empty(schema.clone());
+ std::iter::repeat(empty_batch.clone())
+ .take(rng.gen_range(0..2))
+ .chain(std::iter::once(batch))
+
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
+ })
+ .flatten()
+ .collect()
+}
diff --git a/datafusion/src/execution/memory_manager.rs
b/datafusion/src/execution/memory_manager.rs
index 0fb3cfb..5015f46 100644
--- a/datafusion/src/execution/memory_manager.rs
+++ b/datafusion/src/execution/memory_manager.rs
@@ -392,7 +392,8 @@ const GB: u64 = 1 << 30;
const MB: u64 = 1 << 20;
const KB: u64 = 1 << 10;
-fn human_readable_size(size: usize) -> String {
+/// Present size in human readable form
+pub fn human_readable_size(size: usize) -> String {
let size = size as u64;
let (value, unit) = {
if size >= 2 * TB {
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index e3b42ae..427c539 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -25,4 +25,6 @@ pub mod options;
pub mod runtime_env;
pub use disk_manager::DiskManager;
-pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
+pub use memory_manager::{
+ human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index 7266b6c..7f7f581 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -21,7 +21,7 @@
use crate::error::{DataFusionError, Result};
use crate::execution::memory_manager::{
- ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+ human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId,
MemoryManager,
};
use crate::execution::runtime_env::RuntimeEnv;
use crate::physical_plan::common::{batch_byte_size, IPCWriter,
SizedRecordBatchStream};
@@ -348,7 +348,9 @@ fn write_sorted(
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
- writer.num_batches, writer.num_rows, writer.num_bytes
+ writer.num_batches,
+ writer.num_rows,
+ human_readable_size(writer.num_bytes as usize),
);
Ok(())
}
diff --git a/datafusion/tests/merge_fuzz.rs b/datafusion/tests/merge_fuzz.rs
index 8192054..6821c6b 100644
--- a/datafusion/tests/merge_fuzz.rs
+++ b/datafusion/tests/merge_fuzz.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Fuzz Test for various corner cases merging streams of RecordBatchs
+//! Fuzz Test for various corner cases merging streams of RecordBatches
use std::sync::Arc;
use arrow::{
@@ -32,6 +32,7 @@ use datafusion::{
sorts::sort_preserving_merge::SortPreservingMergeExec,
},
};
+use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
use rand::{prelude::StdRng, Rng, SeedableRng};
#[tokio::test]
@@ -147,35 +148,6 @@ async fn run_merge_test(input: Vec<Vec<RecordBatch>>) {
}
}
-/// Extracts the i32 values from the set of batches and returns them as a
single Vec
-fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
- batches
- .iter()
- .map(|batch| {
- assert_eq!(batch.num_columns(), 1);
- batch
- .column(0)
- .as_any()
- .downcast_ref::<Int32Array>()
- .unwrap()
- .iter()
- })
- .flatten()
- .collect()
-}
-
-// extract values from batches and sort them
-fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) ->
Vec<Option<i32>> {
- let mut values: Vec<_> = partitions
- .iter()
- .map(|batches| batches_to_vec(batches).into_iter())
- .flatten()
- .collect();
-
- values.sort_unstable();
- values
-}
-
/// Return the values `low..high` in order, in randomly sized
/// record batches in a field named 'x' of type `Int32`
fn make_staggered_batches(low: i32, high: i32, seed: u64) -> Vec<RecordBatch> {
@@ -199,24 +171,6 @@ fn make_staggered_batches(low: i32, high: i32, seed: u64)
-> Vec<RecordBatch> {
add_empty_batches(batches, &mut rng)
}
-/// Adds a random number of empty record batches into the stream
-fn add_empty_batches(batches: Vec<RecordBatch>, rng: &mut StdRng) ->
Vec<RecordBatch> {
- let schema = batches[0].schema();
-
- batches
- .into_iter()
- .map(|batch| {
- // insert 0, or 1 empty batches before and after the current batch
- let empty_batch = RecordBatch::new_empty(schema.clone());
- std::iter::repeat(empty_batch.clone())
- .take(rng.gen_range(0..2))
- .chain(std::iter::once(batch))
-
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
- })
- .flatten()
- .collect()
-}
-
fn concat(mut v1: Vec<RecordBatch>, v2: Vec<RecordBatch>) -> Vec<RecordBatch> {
v1.extend(v2);
v1
diff --git a/datafusion/tests/order_spill_fuzz.rs
b/datafusion/tests/order_spill_fuzz.rs
new file mode 100644
index 0000000..049fe6a
--- /dev/null
+++ b/datafusion/tests/order_spill_fuzz.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Fuzz Test for various corner cases sorting RecordBatches exceeds available
memory and should spill
+
+use arrow::{
+ array::{ArrayRef, Int32Array},
+ compute::SortOptions,
+ record_batch::RecordBatch,
+};
+use datafusion::execution::memory_manager::MemoryManagerConfig;
+use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec};
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::sync::Arc;
+
+#[tokio::test]
+async fn test_sort_1k_mem() {
+ run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await
+}
+
+#[tokio::test]
+async fn test_sort_100k_mem() {
+ run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await
+}
+
+#[tokio::test]
+async fn test_sort_unlimited_mem() {
+ run_sort(
+ usize::MAX,
+ vec![(5, false), (2000, false), (1000000, false)],
+ )
+ .await
+}
+
+/// Sort the input using SortExec and ensure the results are correct according
to `Vec::sort`
+async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
+ for (size, spill) in size_spill {
+ let input = vec![make_staggered_batches(size)];
+ let first_batch = input
+ .iter()
+ .map(|p| p.iter())
+ .flatten()
+ .next()
+ .expect("at least one batch");
+ let schema = first_batch.schema();
+
+ let sort = vec![PhysicalSortExpr {
+ expr: col("x", &schema).unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ }];
+
+ let exec = MemoryExec::try_new(&input, schema, None).unwrap();
+ let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec)).unwrap());
+
+ let runtime_config = RuntimeConfig::new().with_memory_manager(
+ MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(),
+ );
+ let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
+ let collected = collect(sort.clone(), runtime).await.unwrap();
+
+ let expected = partitions_to_sorted_vec(&input);
+ let actual = batches_to_vec(&collected);
+
+ if spill {
+ assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+ } else {
+ assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+ }
+
+ assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size);
+ }
+}
+
+/// Return randomly sized record batches in a field named 'x' of type `Int32`
+/// with randomized i32 content
+fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
+ let mut rng = rand::thread_rng();
+ let mut input: Vec<i32> = vec![0; len];
+ rng.fill(&mut input[..]);
+ let input = Int32Array::from_iter_values(input.into_iter());
+
+ // split into several record batches
+ let mut remainder =
+ RecordBatch::try_from_iter(vec![("x", Arc::new(input) as
ArrayRef)]).unwrap();
+
+ let mut batches = vec![];
+
+ // use a random number generator to pick a random sized output
+ let mut rng = StdRng::seed_from_u64(42);
+ while remainder.num_rows() > 0 {
+ let batch_size = rng.gen_range(0..remainder.num_rows() + 1);
+
+ batches.push(remainder.slice(0, batch_size));
+ remainder = remainder.slice(batch_size, remainder.num_rows() -
batch_size);
+ }
+
+ add_empty_batches(batches, &mut rng)
+}