This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 908ca0e684 Migrate physical plan tests to `insta` (Part-3 / Final)
(#15399)
908ca0e684 is described below
commit 908ca0e684f3184075ef799128ee3f58f1e9db86
Author: Shreyas (Lua) <[email protected]>
AuthorDate: Tue Mar 25 05:19:50 2025 +0530
Migrate physical plan tests to `insta` (Part-3 / Final) (#15399)
* Migrated tests to insta in sorts/partial_sort.rs
* Migrated tests to insta in sorts/sort.rs and
sorts/sort_preserving_merge.rs
* Migrated tests to insta in windows/bounded_window_agg_exec.rs
* Removed unused imports
* Fixes for failing tests
---
datafusion/physical-plan/src/sorts/partial_sort.rs | 116 +++++++--------
datafusion/physical-plan/src/sorts/sort.rs | 35 ++---
.../src/sorts/sort_preserving_merge.rs | 157 ++++++++++-----------
.../src/windows/bounded_window_agg_exec.rs | 66 +++++----
4 files changed, 182 insertions(+), 192 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 5277a50b85..320fa21c86 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -467,11 +467,12 @@ mod tests {
use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::datatypes::*;
+ use datafusion_common::test_util::batches_to_string;
use futures::FutureExt;
+ use insta::allow_duplicates;
+ use insta::assert_snapshot;
use itertools::Itertools;
- use datafusion_common::assert_batches_eq;
-
use crate::collect;
use crate::expressions::col;
use crate::expressions::PhysicalSortExpr;
@@ -522,20 +523,21 @@ mod tests {
let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?;
- let expected_after_sort = [
- "+---+---+---+",
- "| a | b | c |",
- "+---+---+---+",
- "| 0 | 1 | 0 |",
- "| 0 | 1 | 1 |",
- "| 0 | 2 | 5 |",
- "| 1 | 2 | 4 |",
- "| 1 | 3 | 2 |",
- "| 1 | 3 | 3 |",
- "+---+---+---+",
- ];
assert_eq!(2, result.len());
- assert_batches_eq!(expected_after_sort, &result);
+ allow_duplicates! {
+ assert_snapshot!(batches_to_string(&result), @r#"
+ +---+---+---+
+ | a | b | c |
+ +---+---+---+
+ | 0 | 1 | 0 |
+ | 0 | 1 | 1 |
+ | 0 | 2 | 5 |
+ | 1 | 2 | 4 |
+ | 1 | 3 | 2 |
+ | 1 | 3 | 3 |
+ +---+---+---+
+ "#);
+ }
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
@@ -588,18 +590,19 @@ mod tests {
let result = collect(partial_sort_exec,
Arc::clone(&task_ctx)).await?;
- let expected_after_sort = [
- "+---+---+---+",
- "| a | b | c |",
- "+---+---+---+",
- "| 0 | 1 | 4 |",
- "| 0 | 2 | 3 |",
- "| 1 | 2 | 2 |",
- "| 1 | 3 | 0 |",
- "+---+---+---+",
- ];
assert_eq!(2, result.len());
- assert_batches_eq!(expected_after_sort, &result);
+ allow_duplicates! {
+ assert_snapshot!(batches_to_string(&result), @r#"
+ +---+---+---+
+ | a | b | c |
+ +---+---+---+
+ | 0 | 1 | 4 |
+ | 0 | 2 | 3 |
+ | 1 | 2 | 2 |
+ | 1 | 3 | 0 |
+ +---+---+---+
+ "#);
+ }
assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
0,
@@ -663,21 +666,22 @@ mod tests {
0,
"The sort should have returned all memory used back to the
memory manager"
);
- let expected = [
- "+---+---+---+",
- "| a | b | c |",
- "+---+---+---+",
- "| 0 | 1 | 6 |",
- "| 0 | 1 | 7 |",
- "| 0 | 3 | 4 |",
- "| 0 | 3 | 5 |",
- "| 1 | 2 | 0 |",
- "| 1 | 2 | 1 |",
- "| 1 | 4 | 2 |",
- "| 1 | 4 | 3 |",
- "+---+---+---+",
- ];
- assert_batches_eq!(expected, &result);
+ allow_duplicates! {
+ assert_snapshot!(batches_to_string(&result), @r#"
+ +---+---+---+
+ | a | b | c |
+ +---+---+---+
+ | 0 | 1 | 6 |
+ | 0 | 1 | 7 |
+ | 0 | 3 | 4 |
+ | 0 | 3 | 5 |
+ | 1 | 2 | 0 |
+ | 1 | 2 | 1 |
+ | 1 | 4 | 2 |
+ | 1 | 4 | 3 |
+ +---+---+---+
+ "#);
+ }
}
Ok(())
}
@@ -1000,21 +1004,6 @@ mod tests {
2,
));
- let expected = [
- "+-----+------+-------+",
- "| a | b | c |",
- "+-----+------+-------+",
- "| 1.0 | 20.0 | 20.0 |",
- "| 1.0 | 20.0 | 10.0 |",
- "| 1.0 | 40.0 | 10.0 |",
- "| 2.0 | 40.0 | 100.0 |",
- "| 2.0 | NaN | NaN |",
- "| 3.0 | | |",
- "| 3.0 | | 100.0 |",
- "| 3.0 | NaN | NaN |",
- "+-----+------+-------+",
- ];
-
assert_eq!(
DataType::Float32,
*partial_sort_exec.schema().field(0).data_type()
@@ -1033,7 +1022,20 @@ mod tests {
task_ctx,
)
.await?;
- assert_batches_eq!(expected, &result);
+ assert_snapshot!(batches_to_string(&result), @r#"
+ +-----+------+-------+
+ | a | b | c |
+ +-----+------+-------+
+ | 1.0 | 20.0 | 20.0 |
+ | 1.0 | 20.0 | 10.0 |
+ | 1.0 | 40.0 | 10.0 |
+ | 2.0 | 40.0 | 100.0 |
+ | 2.0 | NaN | NaN |
+ | 3.0 | | |
+ | 3.0 | | 100.0 |
+ | 3.0 | NaN | NaN |
+ +-----+------+-------+
+ "#);
assert_eq!(result.len(), 2);
let metrics = partial_sort_exec.metrics().unwrap();
assert!(metrics.elapsed_compute().unwrap() > 0);
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index e2d665e1d8..731e24b53c 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1244,7 +1244,8 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
- use datafusion_common::{assert_batches_eq, Result, ScalarValue};
+ use datafusion_common::test_util::batches_to_string;
+ use datafusion_common::{Result, ScalarValue};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_execution::RecordBatchStream;
@@ -1252,6 +1253,7 @@ mod tests {
use datafusion_physical_expr::EquivalenceProperties;
use futures::{FutureExt, Stream};
+ use insta::assert_snapshot;
#[derive(Debug, Clone)]
pub struct SortedUnboundedExec {
@@ -1913,22 +1915,21 @@ mod tests {
plan = plan.with_fetch(Some(9));
let batches = collect(Arc::new(plan), task_ctx).await?;
- #[rustfmt::skip]
- let expected = [
- "+----+",
- "| c1 |",
- "+----+",
- "| 0 |",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "| 4 |",
- "| 5 |",
- "| 6 |",
- "| 7 |",
- "| 8 |",
- "+----+",];
- assert_batches_eq!(expected, &batches);
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+
+ | c1 |
+ +----+
+ | 0 |
+ | 1 |
+ | 2 |
+ | 3 |
+ | 4 |
+ | 5 |
+ | 6 |
+ | 7 |
+ | 8 |
+ +----+
+ "#);
Ok(())
}
}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index ca06a029e8..b987dff364 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -412,6 +412,7 @@ mod tests {
};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+ use datafusion_common::test_util::batches_to_string;
use datafusion_common::{assert_batches_eq, assert_contains,
DataFusionError};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::config::SessionConfig;
@@ -423,6 +424,7 @@ mod tests {
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use futures::{FutureExt, Stream, StreamExt};
+ use insta::assert_snapshot;
use tokio::time::timeout;
// The number in the function is highly related to the memory limit we are
testing
@@ -992,25 +994,22 @@ mod tests {
let collected = collect(merge, task_ctx).await.unwrap();
assert_eq!(collected.len(), 1);
- assert_batches_eq!(
- &[
- "+---+---+-------------------------------+",
- "| a | b | c |",
- "+---+---+-------------------------------+",
- "| 1 | | 1970-01-01T00:00:00.000000008 |",
- "| 1 | | 1970-01-01T00:00:00.000000008 |",
- "| 2 | a | |",
- "| 7 | b | 1970-01-01T00:00:00.000000006 |",
- "| 2 | b | |",
- "| 9 | d | |",
- "| 3 | e | 1970-01-01T00:00:00.000000004 |",
- "| 3 | g | 1970-01-01T00:00:00.000000005 |",
- "| 4 | h | |",
- "| 5 | i | 1970-01-01T00:00:00.000000004 |",
- "+---+---+-------------------------------+",
- ],
- collected.as_slice()
- );
+ assert_snapshot!(batches_to_string(collected.as_slice()), @r#"
+ +---+---+-------------------------------+
+ | a | b | c |
+ +---+---+-------------------------------+
+ | 1 | | 1970-01-01T00:00:00.000000008 |
+ | 1 | | 1970-01-01T00:00:00.000000008 |
+ | 2 | a | |
+ | 7 | b | 1970-01-01T00:00:00.000000006 |
+ | 2 | b | |
+ | 9 | d | |
+ | 3 | e | 1970-01-01T00:00:00.000000004 |
+ | 3 | g | 1970-01-01T00:00:00.000000005 |
+ | 4 | h | |
+ | 5 | i | 1970-01-01T00:00:00.000000004 |
+ +---+---+-------------------------------+
+ "#);
}
#[tokio::test]
@@ -1035,17 +1034,14 @@ mod tests {
let collected = collect(merge, task_ctx).await.unwrap();
assert_eq!(collected.len(), 1);
- assert_batches_eq!(
- &[
- "+---+---+",
- "| a | b |",
- "+---+---+",
- "| 1 | a |",
- "| 2 | b |",
- "+---+---+",
- ],
- collected.as_slice()
- );
+ assert_snapshot!(batches_to_string(collected.as_slice()), @r#"
+ +---+---+
+ | a | b |
+ +---+---+
+ | 1 | a |
+ | 2 | b |
+ +---+---+
+ "#);
}
#[tokio::test]
@@ -1069,20 +1065,17 @@ mod tests {
let collected = collect(merge, task_ctx).await.unwrap();
assert_eq!(collected.len(), 1);
- assert_batches_eq!(
- &[
- "+---+---+",
- "| a | b |",
- "+---+---+",
- "| 1 | a |",
- "| 2 | b |",
- "| 7 | c |",
- "| 9 | d |",
- "| 3 | e |",
- "+---+---+",
- ],
- collected.as_slice()
- );
+ assert_snapshot!(batches_to_string(collected.as_slice()), @r#"
+ +---+---+
+ | a | b |
+ +---+---+
+ | 1 | a |
+ | 2 | b |
+ | 7 | c |
+ | 9 | d |
+ | 3 | e |
+ +---+---+
+ "#);
}
#[tokio::test]
@@ -1179,17 +1172,16 @@ mod tests {
let collected = collect(Arc::clone(&merge) as Arc<dyn ExecutionPlan>,
task_ctx)
.await
.unwrap();
- let expected = [
- "+----+---+",
- "| a | b |",
- "+----+---+",
- "| 1 | a |",
- "| 10 | b |",
- "| 2 | c |",
- "| 20 | d |",
- "+----+---+",
- ];
- assert_batches_eq!(expected, collected.as_slice());
+ assert_snapshot!(batches_to_string(collected.as_slice()), @r#"
+ +----+---+
+ | a | b |
+ +----+---+
+ | 1 | a |
+ | 10 | b |
+ | 2 | c |
+ | 20 | d |
+ +----+---+
+ "#);
// Now, validate metrics
let metrics = merge.metrics().unwrap();
@@ -1293,35 +1285,32 @@ mod tests {
// Expect the data to be sorted first by "batch_number" (because
// that was the order it was fed in, even though only "value"
// is in the sort key)
- assert_batches_eq!(
- &[
- "+--------------+-------+",
- "| batch_number | value |",
- "+--------------+-------+",
- "| 0 | A |",
- "| 1 | A |",
- "| 2 | A |",
- "| 3 | A |",
- "| 4 | A |",
- "| 5 | A |",
- "| 6 | A |",
- "| 7 | A |",
- "| 8 | A |",
- "| 9 | A |",
- "| 0 | B |",
- "| 1 | B |",
- "| 2 | B |",
- "| 3 | B |",
- "| 4 | B |",
- "| 5 | B |",
- "| 6 | B |",
- "| 7 | B |",
- "| 8 | B |",
- "| 9 | B |",
- "+--------------+-------+",
- ],
- collected.as_slice()
- );
+ assert_snapshot!(batches_to_string(collected.as_slice()), @r#"
+ +--------------+-------+
+ | batch_number | value |
+ +--------------+-------+
+ | 0 | A |
+ | 1 | A |
+ | 2 | A |
+ | 3 | A |
+ | 4 | A |
+ | 5 | A |
+ | 6 | A |
+ | 7 | A |
+ | 8 | A |
+ | 9 | A |
+ | 0 | B |
+ | 1 | B |
+ | 2 | B |
+ | 3 | B |
+ | 4 | B |
+ | 5 | B |
+ | 6 | B |
+ | 7 | B |
+ | 8 | B |
+ | 9 | B |
+ +--------------+-------+
+ "#);
}
/// It returns pending for the 2nd partition until the 3rd partition is
polled. The 1st
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index f9f4b78686..92138bf6a7 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1222,9 +1222,8 @@ mod tests {
};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_common::{
- assert_batches_eq, exec_datafusion_err, Result, ScalarValue,
- };
+ use datafusion_common::test_util::batches_to_string;
+ use datafusion_common::{exec_datafusion_err, Result, ScalarValue};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::{
RecordBatchStream, SendableRecordBatchStream, TaskContext,
@@ -1241,6 +1240,7 @@ mod tests {
use futures::future::Shared;
use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
+ use insta::assert_snapshot;
use itertools::Itertools;
use tokio::time::timeout;
@@ -1664,22 +1664,21 @@ mod tests {
"\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
- let expected = [
- "+---+------+---------------+---------------+",
- "| a | last | nth_value(-1) | nth_value(-2) |",
- "+---+------+---------------+---------------+",
- "| 1 | 1 | 1 | |",
- "| 2 | 2 | 2 | 1 |",
- "| 3 | 3 | 3 | 2 |",
- "| 1 | 1 | 1 | 3 |",
- "| 2 | 2 | 2 | 1 |",
- "| 3 | 3 | 3 | 2 |",
- "| 1 | 1 | 1 | 3 |",
- "| 2 | 2 | 2 | 1 |",
- "| 3 | 3 | 3 | 2 |",
- "+---+------+---------------+---------------+",
- ];
- assert_batches_eq!(expected, &batches);
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +---+------+---------------+---------------+
+ | a | last | nth_value(-1) | nth_value(-2) |
+ +---+------+---------------+---------------+
+ | 1 | 1 | 1 | |
+ | 2 | 2 | 2 | 1 |
+ | 3 | 3 | 3 | 2 |
+ | 1 | 1 | 1 | 3 |
+ | 2 | 2 | 2 | 1 |
+ | 3 | 3 | 3 | 2 |
+ | 1 | 1 | 1 | 3 |
+ | 2 | 2 | 2 | 1 |
+ | 3 | 3 | 3 | 2 |
+ +---+------+---------------+---------------+
+ "#);
Ok(())
}
@@ -1792,21 +1791,20 @@ mod tests {
let task_ctx = task_context();
let batches = collect_with_timeout(plan, task_ctx,
timeout_duration).await?;
- let expected = [
- "+----+------+-------+",
- "| sn | hash | col_2 |",
- "+----+------+-------+",
- "| 0 | 2 | 2 |",
- "| 1 | 2 | 2 |",
- "| 2 | 2 | 2 |",
- "| 3 | 2 | 1 |",
- "| 4 | 1 | 2 |",
- "| 5 | 1 | 2 |",
- "| 6 | 1 | 2 |",
- "| 7 | 1 | 1 |",
- "+----+------+-------+",
- ];
- assert_batches_eq!(expected, &batches);
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+------+-------+
+ | sn | hash | col_2 |
+ +----+------+-------+
+ | 0 | 2 | 2 |
+ | 1 | 2 | 2 |
+ | 2 | 2 | 2 |
+ | 3 | 2 | 1 |
+ | 4 | 1 | 2 |
+ | 5 | 1 | 2 |
+ | 6 | 1 | 2 |
+ | 7 | 1 | 1 |
+ +----+------+-------+
+ "#);
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]