This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 295ffb41f5 test: Add plan execution during tests for bounded source
(#14013)
295ffb41f5 is described below
commit 295ffb41f562f2c58b22ce36928df3f85f7fe09a
Author: Aleksey Kirilishin <[email protected]>
AuthorDate: Fri Jan 10 13:19:23 2025 +0300
test: Add plan execution during tests for bounded source (#14013)
---
.../replace_with_order_preserving_variants.rs | 251 +++++++++++++--------
1 file changed, 151 insertions(+), 100 deletions(-)
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 96b2454fa3..9f5afc7abc 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -285,9 +285,7 @@ pub(crate) fn replace_with_order_preserving_variants(
mod tests {
use super::*;
- use
crate::datasource::file_format::file_compression_type::FileCompressionType;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
+ use crate::execution::TaskContext;
use crate::physical_optimizer::test_utils::check_integrity;
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
@@ -296,18 +294,24 @@ mod tests {
use crate::physical_plan::{
displayable, get_plan_string, ExecutionPlan, Partitioning,
};
- use crate::prelude::SessionConfig;
+ use crate::prelude::{SessionConfig, SessionContext};
use crate::test::TestStreamPartition;
+ use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+ use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::Result;
- use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
+ use datafusion_physical_plan::collect;
+ use datafusion_physical_plan::memory::MemoryExec;
use datafusion_physical_plan::streaming::StreamingTableExec;
+ use object_store::memory::InMemory;
+ use object_store::ObjectStore;
+ use url::Url;
use rstest::rstest;
@@ -328,20 +332,24 @@ mod tests {
/// * `$PLAN`: The plan to optimize.
/// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded
source.
macro_rules! assert_optimized_in_all_boundedness_situations {
- ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES:
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr,
$SOURCE_UNBOUNDED: expr) => {
+ ($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES:
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr,
$SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => {
if $SOURCE_UNBOUNDED {
assert_optimized_prefer_sort_on_off!(
$EXPECTED_UNBOUNDED_PLAN_LINES,
$EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
$EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
- $PLAN
+ $PLAN,
+ $PREFER_EXISTING_SORT,
+ $SOURCE_UNBOUNDED
);
} else {
assert_optimized_prefer_sort_on_off!(
$EXPECTED_BOUNDED_PLAN_LINES,
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
- $PLAN
+ $PLAN,
+ $PREFER_EXISTING_SORT,
+ $SOURCE_UNBOUNDED
);
}
};
@@ -359,19 +367,24 @@ mod tests {
/// the flag `prefer_existing_sort` is `true`.
/// * `$PLAN`: The plan to optimize.
macro_rules! assert_optimized_prefer_sort_on_off {
- ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
- assert_optimized!(
- $EXPECTED_PLAN_LINES,
- $EXPECTED_OPTIMIZED_PLAN_LINES,
- $PLAN.clone(),
- false
- );
- assert_optimized!(
- $EXPECTED_PLAN_LINES,
- $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
- $PLAN,
- true
- );
+ ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr,
$PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
+ if $PREFER_EXISTING_SORT {
+ assert_optimized!(
+ $EXPECTED_PLAN_LINES,
+ $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+ $PLAN,
+ $PREFER_EXISTING_SORT,
+ $SOURCE_UNBOUNDED
+ );
+ } else {
+ assert_optimized!(
+ $EXPECTED_PLAN_LINES,
+ $EXPECTED_OPTIMIZED_PLAN_LINES,
+ $PLAN,
+ $PREFER_EXISTING_SORT,
+ $SOURCE_UNBOUNDED
+ );
+ }
};
}
@@ -385,7 +398,7 @@ mod tests {
/// * `$PLAN`: The plan to optimize.
/// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag.
macro_rules! assert_optimized {
- ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr, $PREFER_EXISTING_SORT: expr) => {
+ ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
let physical_plan = $PLAN;
let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -412,6 +425,19 @@ mod tests {
expected_optimized_lines, actual,
"\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
+
+ if !$SOURCE_UNBOUNDED {
+ let ctx = SessionContext::new();
+ let object_store = InMemory::new();
+ object_store.put(&object_store::path::Path::from("file_path"),
bytes::Bytes::from("").into()).await?;
+ ctx.register_object_store(&Url::parse("test://").unwrap(),
Arc::new(object_store));
+ let task_ctx = Arc::new(TaskContext::from(&ctx));
+ let res = collect(optimized_physical_plan, task_ctx).await;
+ assert!(
+ res.is_ok(),
+ "Some errors occurred while executing the optimized
physical plan: {:?}", res.unwrap_err()
+ );
+ }
};
}
@@ -420,13 +446,14 @@ mod tests {
// Searches for a simple sort and a repartition just after it, the second
repartition with 1 input partition should not be affected
async fn test_replace_multiple_input_repartition_1(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
@@ -447,7 +474,7 @@ mod tests {
" SortExec: expr=[a@0 ASC NULLS LAST],
preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -464,13 +491,13 @@ mod tests {
" SortExec: expr=[a@0 ASC NULLS LAST],
preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -479,7 +506,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -488,13 +516,14 @@ mod tests {
#[tokio::test]
async fn test_with_inter_children_change_only(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -538,7 +567,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC",
];
// Expected unbounded result (same for with and without flag)
@@ -564,7 +593,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC]",
@@ -574,7 +603,7 @@ mod tests {
" SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -583,7 +612,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -592,13 +622,14 @@ mod tests {
#[tokio::test]
async fn test_replace_multiple_input_repartition_2(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr);
@@ -623,7 +654,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -642,14 +673,14 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -658,7 +689,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -667,13 +699,14 @@ mod tests {
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -701,7 +734,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -722,7 +755,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -730,7 +763,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -739,7 +772,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -748,13 +782,14 @@ mod tests {
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps_2(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
@@ -786,7 +821,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -809,7 +844,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -818,7 +853,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -827,7 +862,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -836,13 +872,14 @@ mod tests {
#[tokio::test]
async fn test_not_replacing_when_no_need_to_preserve_sorting(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -867,7 +904,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -887,7 +924,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
@@ -898,7 +935,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -907,13 +945,14 @@ mod tests {
#[tokio::test]
async fn test_with_multiple_replacable_repartitions(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -944,7 +983,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -967,7 +1006,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -976,7 +1015,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -985,7 +1024,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -994,13 +1034,14 @@ mod tests {
#[tokio::test]
async fn test_not_replace_with_different_orderings(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1028,7 +1069,7 @@ mod tests {
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -1046,7 +1087,7 @@ mod tests {
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
@@ -1057,7 +1098,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -1066,13 +1108,14 @@ mod tests {
#[tokio::test]
async fn test_with_lost_ordering(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1093,7 +1136,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -1110,13 +1153,13 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -1125,7 +1168,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -1134,13 +1178,14 @@ mod tests {
#[tokio::test]
async fn test_with_lost_and_kept_ordering(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
- csv_exec_sorted(&schema, sort_exprs)
+ memory_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1184,7 +1229,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -1211,7 +1256,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [c@1 ASC]",
@@ -1222,7 +1267,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
@@ -1231,7 +1276,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -1240,6 +1286,7 @@ mod tests {
#[tokio::test]
async fn test_with_multiple_child_trees(
#[values(false, true)] source_unbounded: bool,
+ #[values(false, true)] prefer_existing_sort: bool,
) -> Result<()> {
let schema = create_test_schema()?;
@@ -1247,7 +1294,7 @@ mod tests {
let left_source = if source_unbounded {
stream_exec_ordered(&schema, left_sort_exprs)
} else {
- csv_exec_sorted(&schema, left_sort_exprs)
+ memory_exec_sorted(&schema, left_sort_exprs)
};
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
@@ -1258,7 +1305,7 @@ mod tests {
let right_source = if source_unbounded {
stream_exec_ordered(&schema, right_sort_exprs)
} else {
- csv_exec_sorted(&schema, right_sort_exprs)
+ memory_exec_sorted(&schema, right_sort_exprs)
};
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash =
repartition_exec_hash(right_repartition_rr);
@@ -1299,11 +1346,11 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
// Expected unbounded result (same for with and without flag)
@@ -1330,11 +1377,11 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+ " MemoryExec: partitions=1, partition_sizes=[1],
output_ordering=a@0 ASC NULLS LAST",
];
let expected_optimized_bounded_sort_preserve =
expected_optimized_bounded;
@@ -1345,7 +1392,8 @@ mod tests {
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
- source_unbounded
+ source_unbounded,
+ prefer_existing_sort
);
Ok(())
}
@@ -1492,33 +1540,36 @@ mod tests {
)
}
- // creates a csv exec source for the test purposes
- // projection and has_header parameters are given static due to testing
needs
- fn csv_exec_sorted(
+ // creates a memory exec source for the test purposes
+ // projection parameter is given static due to testing needs
+ fn memory_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
- let sort_exprs = sort_exprs.into_iter().collect();
- let projection: Vec<usize> = vec![0, 2, 3];
+ pub fn make_partition(schema: &SchemaRef, sz: i32) -> RecordBatch {
+ let values = (0..sz).collect::<Vec<_>>();
+ let arr = Arc::new(Int32Array::from(values));
+ let arr = arr as ArrayRef;
- Arc::new(
- CsvExec::builder(
- FileScanConfig::new(
- ObjectStoreUrl::parse("test:///").unwrap(),
- schema.clone(),
- )
- .with_file(PartitionedFile::new("file_path".to_string(), 100))
- .with_projection(Some(projection))
- .with_output_ordering(vec![sort_exprs]),
+ RecordBatch::try_new(
+ schema.clone(),
+ vec![arr.clone(), arr.clone(), arr.clone(), arr],
)
- .with_has_header(true)
- .with_delimeter(0)
- .with_quote(b'"')
- .with_escape(None)
- .with_comment(None)
- .with_newlines_in_values(false)
- .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
- .build(),
- )
+ .unwrap()
+ }
+
+ let rows = 5;
+ let partitions = 1;
+ let sort_exprs = sort_exprs.into_iter().collect();
+ Arc::new({
+ let data: Vec<Vec<_>> = (0..partitions)
+ .map(|_| vec![make_partition(schema, rows)])
+ .collect();
+ let projection: Vec<usize> = vec![0, 2, 3];
+ MemoryExec::try_new(&data, schema.clone(), Some(projection))
+ .unwrap()
+ .try_with_sort_information(vec![sort_exprs])
+ .unwrap()
+ })
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]