This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ab09536280 fix: split "union" and "interleave" (#6045)
ab09536280 is described below
commit ab09536280a00444c1d2882b4e1ef579cab992dd
Author: Marco Neumann <[email protected]>
AuthorDate: Fri Apr 21 13:15:34 2023 +0200
fix: split "union" and "interleave" (#6045)
* fix: split "union" and "interleave"
Silently interleaving partitions is confusing and causes bugs.
Fixes #5970.
* refactor: add interleave during dist enforcement
This avoids a top-level repartition in the following query:
```sql
SELECT c1, c9 FROM aggregate_test_100
UNION ALL
SELECT c1, c3 FROM aggregate_test_100
ORDER BY c9 DESC LIMIT 5
```
* fix: docs
---
.../src/physical_optimizer/dist_enforcement.rs | 68 +++++
datafusion/core/src/physical_plan/union.rs | 314 +++++++++++++++------
datafusion/core/tests/sql/mod.rs | 6 +-
datafusion/core/tests/sql/order.rs | 77 +++++
datafusion/core/tests/sql/union.rs | 40 +++
5 files changed, 424 insertions(+), 81 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index affe432830..f74a7f2e93 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -29,6 +29,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
@@ -825,6 +826,35 @@ fn ensure_distribution(
return Ok(Transformed::No(plan));
}
+ // special case for UnionExec: We want to "bubble up" hash-partitioned
data. So instead of:
+ //
+ // Agg:
+ // Repartition (hash):
+ // Union:
+ // - Agg:
+ // Repartition (hash):
+ // Data
+ // - Agg:
+ // Repartition (hash):
+ // Data
+ //
+ // We can use:
+ //
+ // Agg:
+ // Interleave:
+ // - Agg:
+ // Repartition (hash):
+ // Data
+ // - Agg:
+ // Repartition (hash):
+ // Data
+ if let Some(union_exec) = plan.as_any().downcast_ref::<UnionExec>() {
+ if can_interleave(union_exec.inputs()) {
+ let plan = InterleaveExec::try_new(union_exec.inputs().clone())?;
+ return Ok(Transformed::Yes(Arc::new(plan)));
+ }
+ }
+
let required_input_distributions = plan.required_input_distribution();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
assert_eq!(children.len(), required_input_distributions.len());
@@ -2134,4 +2164,42 @@ mod tests {
assert_optimized!(expected, exec);
Ok(())
}
+
+ #[test]
+ fn union_to_interleave() -> Result<()> {
+ // group by (a as a1)
+ let left = aggregate_exec_with_alias(
+ parquet_exec(),
+ vec![("a".to_string(), "a1".to_string())],
+ );
+ // group by (a as a2)
+ let right = aggregate_exec_with_alias(
+ parquet_exec(),
+ vec![("a".to_string(), "a1".to_string())],
+ );
+
+ // Union
+ let plan = Arc::new(UnionExec::new(vec![left, right]));
+
+ // final agg
+ let plan =
+ aggregate_exec_with_alias(plan, vec![("a1".to_string(),
"a2".to_string())]);
+
+ // Only two RepartitionExecs added, no final RepartionExec required
+ let expected = &[
+ "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
+ "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+ "InterleaveExec",
+ "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index:
0 }], 10), input_partitions=1",
+ "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
+ "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+ "RepartitionExec: partitioning=Hash([Column { name: \"a1\", index:
0 }], 10), input_partitions=1",
+ "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[a, b, c, d, e]",
+ ];
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index 3d4522272a..0448813240 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -94,8 +94,6 @@ pub struct UnionExec {
metrics: ExecutionPlanMetricsSet,
/// Schema of Union
schema: SchemaRef,
- /// Partition aware Union
- partition_aware: bool,
}
impl UnionExec {
@@ -133,45 +131,12 @@ impl UnionExec {
/// Create a new UnionExec
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
- let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
- .map(|i| {
- inputs
- .iter()
- .filter_map(|input| {
- if input.schema().fields().len() > i {
- Some(input.schema().field(i).clone())
- } else {
- None
- }
- })
- .find_or_first(|f| f.is_nullable())
- .unwrap()
- })
- .collect();
-
- let schema = Arc::new(Schema::new_with_metadata(
- fields,
- inputs[0].schema().metadata().clone(),
- ));
-
- // If all the input partitions have the same Hash partition spec with
the first_input_partition
- // The UnionExec is partition aware.
- //
- // It might be too strict here in the case that the input partition
specs are compatible but not exactly the same.
- // For example one input partition has the partition spec
Hash('a','b','c') and
- // other has the partition spec Hash('a'), It is safe to derive the
out partition with the spec Hash('a','b','c').
- let first_input_partition = inputs[0].output_partitioning();
- let partition_aware = matches!(first_input_partition,
Partitioning::Hash(_, _))
- && inputs
- .iter()
- .map(|plan| plan.output_partitioning())
- .all(|partition| partition == first_input_partition);
+ let schema = union_schema(&inputs);
UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
schema,
- partition_aware,
}
}
@@ -204,28 +169,20 @@ impl ExecutionPlan for UnionExec {
/// Output of the union is the combination of all output partitions of the
inputs
fn output_partitioning(&self) -> Partitioning {
- if self.partition_aware {
- self.inputs[0].output_partitioning()
- } else {
- // Output the combination of all output partitions of the inputs
if the Union is not partition aware
- let num_partitions = self
- .inputs
- .iter()
- .map(|plan| plan.output_partitioning().partition_count())
- .sum();
+ // Output the combination of all output partitions of the inputs if
the Union is not partition aware
+ let num_partitions = self
+ .inputs
+ .iter()
+ .map(|plan| plan.output_partitioning().partition_count())
+ .sum();
- Partitioning::UnknownPartitioning(num_partitions)
- }
+ Partitioning::UnknownPartitioning(num_partitions)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- // If the Union is partition aware, there is no output ordering.
- // Otherwise, the output ordering is the "meet" of its input orderings.
+ // The output ordering is the "meet" of its input orderings.
// The meet is the finest ordering that satisfied by all the input
// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
- if self.partition_aware {
- return None;
- }
get_meet_of_orderings(&self.inputs)
}
@@ -273,34 +230,15 @@ impl ExecutionPlan for UnionExec {
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer(); // record on drop
- if self.partition_aware {
- let mut input_stream_vec = vec![];
- for input in self.inputs.iter() {
- if partition < input.output_partitioning().partition_count() {
- input_stream_vec.push(input.execute(partition,
context.clone())?);
- } else {
- // Do not find a partition to execute
- break;
- }
- }
- if input_stream_vec.len() == self.inputs.len() {
- let stream = Box::pin(CombinedRecordBatchStream::new(
- self.schema(),
- input_stream_vec,
- ));
+ // find partition to execute
+ for input in self.inputs.iter() {
+ // Calculate whether partition belongs to the current partition
+ if partition < input.output_partitioning().partition_count() {
+ let stream = input.execute(partition, context)?;
+ debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream,
baseline_metrics)));
- }
- } else {
- // find partition to execute
- for input in self.inputs.iter() {
- // Calculate whether partition belongs to the current partition
- if partition < input.output_partitioning().partition_count() {
- let stream = input.execute(partition, context)?;
- debug!("Found a Union partition to execute");
- return Ok(Box::pin(ObservedStream::new(stream,
baseline_metrics)));
- } else {
- partition -= input.output_partitioning().partition_count();
- }
+ } else {
+ partition -= input.output_partitioning().partition_count();
}
}
@@ -340,8 +278,224 @@ impl ExecutionPlan for UnionExec {
}
}
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// | |---+
+/// | Input 1 | |
+/// | |-------------+
+/// +---------+ | |
+/// | | +---------+
+/// +------------------>| |
+/// +---------------->| Combine |-->
+/// | +-------------->| |
+/// | | | +---------+
+/// +---------+ | | |
+/// | |-----+ | |
+/// | Input 2 | | |
+/// | |---------------+
+/// +---------+ | | |
+/// | | | +---------+
+/// | +-------->| |
+/// | +------>| Combine |-->
+/// | +---->| |
+/// | | +---------+
+/// +---------+ | |
+/// | |-------+ |
+/// | Input 3 | |
+/// | |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+ /// Input execution plan
+ inputs: Vec<Arc<dyn ExecutionPlan>>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
+ /// Schema of Interleave
+ schema: SchemaRef,
+}
+
+impl InterleaveExec {
+ /// Create a new InterleaveExec
+ pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+ let schema = union_schema(&inputs);
+
+ if !can_interleave(&inputs) {
+ return Err(DataFusionError::Internal(String::from(
+ "Not all InterleaveExec children have a consistent hash
partitioning",
+ )));
+ }
+
+ Ok(InterleaveExec {
+ inputs,
+ metrics: ExecutionPlanMetricsSet::new(),
+ schema,
+ })
+ }
+
+ /// Get inputs of the execution plan
+ pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+ &self.inputs
+ }
+}
+
+impl ExecutionPlan for InterleaveExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children.iter().any(|x| *x))
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ self.inputs.clone()
+ }
+
+ /// All inputs must have the same partitioning. The output partioning of
InterleaveExec is the same as the inputs
+ /// (NOT combined). E.g. if there are 10 inputs where each is
`Hash(3)`-partitioned, InterleaveExec is also
+ /// `Hash(3)`-partitioned.
+ fn output_partitioning(&self) -> Partitioning {
+ self.inputs[0].output_partitioning()
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false; self.inputs().len()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(InterleaveExec::try_new(children)?))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ debug!("Start InterleaveExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+ // record the tiny amount of work done in this function so
+ // elapsed_compute is reported as non zero
+ let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+ let _timer = elapsed_compute.timer(); // record on drop
+
+ let mut input_stream_vec = vec![];
+ for input in self.inputs.iter() {
+ if partition < input.output_partitioning().partition_count() {
+ input_stream_vec.push(input.execute(partition,
context.clone())?);
+ } else {
+ // Do not find a partition to execute
+ break;
+ }
+ }
+ if input_stream_vec.len() == self.inputs.len() {
+ let stream = Box::pin(CombinedRecordBatchStream::new(
+ self.schema(),
+ input_stream_vec,
+ ));
+ return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+ }
+
+ warn!("Error in InterleaveExec: Partition {} not found", partition);
+
+ Err(crate::error::DataFusionError::Execution(format!(
+ "Partition {partition} not found in InterleaveExec"
+ )))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "InterleaveExec")
+ }
+ }
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
+ fn statistics(&self) -> Statistics {
+ self.inputs
+ .iter()
+ .map(|ep| ep.statistics())
+ .reduce(stats_union)
+ .unwrap_or_default()
+ }
+
+ fn benefits_from_input_partitioning(&self) -> bool {
+ false
+ }
+}
+
+/// If all the input partitions have the same Hash partition spec with the
first_input_partition
+/// The InterleaveExec is partition aware.
+///
+/// It might be too strict here in the case that the input partition specs are
compatible but not exactly the same.
+/// For example one input partition has the partition spec Hash('a','b','c')
and
+/// other has the partition spec Hash('a'), It is safe to derive the out
partition with the spec Hash('a','b','c').
+pub fn can_interleave(inputs: &[Arc<dyn ExecutionPlan>]) -> bool {
+ if inputs.is_empty() {
+ return false;
+ }
+
+ let first_input_partition = inputs[0].output_partitioning();
+ matches!(first_input_partition, Partitioning::Hash(_, _))
+ && inputs
+ .iter()
+ .map(|plan| plan.output_partitioning())
+ .all(|partition| partition == first_input_partition)
+}
+
+fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
+ let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
+ .map(|i| {
+ inputs
+ .iter()
+ .filter_map(|input| {
+ if input.schema().fields().len() > i {
+ Some(input.schema().field(i).clone())
+ } else {
+ None
+ }
+ })
+ .find_or_first(|f| f.is_nullable())
+ .unwrap()
+ })
+ .collect();
+
+ Arc::new(Schema::new_with_metadata(
+ fields,
+ inputs[0].schema().metadata().clone(),
+ ))
+}
+
/// CombinedRecordBatchStream can be used to combine a Vec of
SendableRecordBatchStreams into one
-pub struct CombinedRecordBatchStream {
+struct CombinedRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
/// Stream entries
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 192e2f65f8..8c7f50c9a6 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -751,7 +751,11 @@ fn create_sort_merge_join_datatype_context() ->
Result<SessionContext> {
}
fn create_union_context() -> Result<SessionContext> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::with_config(
+ SessionConfig::new()
+ .with_target_partitions(4)
+ .with_batch_size(4096),
+ );
let t1_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::UInt8, true),
diff --git a/datafusion/core/tests/sql/order.rs
b/datafusion/core/tests/sql/order.rs
index da7663cb8b..5691a7cfd6 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -175,3 +175,80 @@ async fn sort_with_duplicate_sort_exprs() -> Result<()> {
Ok(())
}
+
+/// Minimal test case for
https://github.com/apache/arrow-datafusion/issues/5970
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_issue5970_mini() -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(2)
+ .with_repartition_sorts(true);
+ let ctx = SessionContext::with_config(config);
+ let sql = "
+WITH
+ m0(t) AS (
+ VALUES (0), (1), (2)),
+ m1(t) AS (
+ VALUES (0), (1)),
+ u AS (
+ SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
+ v AS (
+ SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+ ";
+
+ // check phys. plan
+ let dataframe = ctx.sql(sql).await.unwrap();
+ let plan = dataframe.into_optimized_plan().unwrap();
+ let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
+ let expected = vec![
+ "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
+ " SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
+ " InterleaveExec",
+ " ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
+ " AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as
Int64(0), t@1 as t], aggr=[]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2),
input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as
t], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " ProjectionExec: expr=[column1@0 as t]",
+ " ValuesExec",
+ " ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
+ " AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as
Int64(1), t@1 as t], aggr=[]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2),
input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as
t], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " ProjectionExec: expr=[column1@0 as t]",
+ " ValuesExec",
+ ];
+ let formatted = displayable(plan.as_ref()).indent().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ // sometimes it "just works"
+ for i in 0..10 {
+ println!("run: {i}");
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ // in https://github.com/apache/arrow-datafusion/issues/5970 the order
of the output was sometimes not right
+ let expected = vec![
+ "+---+---+",
+ "| m | t |",
+ "+---+---+",
+ "| 0 | 0 |",
+ "| 0 | 1 |",
+ "| 0 | 2 |",
+ "| 1 | 0 |",
+ "| 1 | 1 |",
+ "+---+---+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/union.rs
b/datafusion/core/tests/sql/union.rs
index d56a0a39ff..cca9c23e67 100644
--- a/datafusion/core/tests/sql/union.rs
+++ b/datafusion/core/tests/sql/union.rs
@@ -140,3 +140,43 @@ async fn test_union_upcast_types() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn union_with_hash_aggregate() -> Result<()> {
+ let ctx = create_union_context()?;
+ let sql = "select count(*) from (
+ select distinct name from t1
+ union all
+ select distinct name from t2
+ ) group by name";
+
+ let dataframe = ctx.sql(sql).await.unwrap();
+ let plan = dataframe.into_optimized_plan().unwrap();
+ let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
+ let formatted = displayable(plan.as_ref()).indent().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+
+ let expected = vec![
+ "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]",
+ " AggregateExec: mode=Single, gby=[name@0 as name],
aggr=[COUNT(UInt8(1))]",
+ " InterleaveExec",
+ " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"name\", index: 0 }], 4), input_partitions=4",
+ " RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1",
+ " AggregateExec: mode=Partial, gby=[name@0 as name],
aggr=[]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[]",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"name\", index: 0 }], 4), input_partitions=4",
+ " RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1",
+ " AggregateExec: mode=Partial, gby=[name@0 as name],
aggr=[]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ];
+
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+ Ok(())
+}