This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a08ce40050 Update `physical_plan` tests to not use SessionContext
(#7243)
a08ce40050 is described below
commit a08ce400502792e91c5e34fe8df1cd479e56a31b
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 16:17:58 2023 -0500
Update `physical_plan` tests to not use SessionContext (#7243)
* Update `physical_plan` tests to not use SessionContext
* fix
---
.../core/src/physical_plan/aggregates/mod.rs | 28 +--
datafusion/core/src/physical_plan/analyze.rs | 4 +-
.../core/src/physical_plan/coalesce_batches.rs | 39 +---
.../core/src/physical_plan/coalesce_partitions.rs | 10 +-
datafusion/core/src/physical_plan/empty.rs | 13 +-
datafusion/core/src/physical_plan/filter.rs | 4 +-
.../core/src/physical_plan/joins/cross_join.rs | 9 +-
.../core/src/physical_plan/joins/hash_join.rs | 102 ++++-----
.../src/physical_plan/joins/nested_loop_join.rs | 35 +---
.../src/physical_plan/joins/sort_merge_join.rs | 26 ++-
.../src/physical_plan/joins/symmetric_hash_join.rs | 233 +++------------------
datafusion/core/src/physical_plan/limit.rs | 7 +-
datafusion/core/src/physical_plan/projection.rs | 7 +-
.../core/src/physical_plan/repartition/mod.rs | 41 ++--
datafusion/core/src/physical_plan/sorts/sort.rs | 41 ++--
.../physical_plan/sorts/sort_preserving_merge.rs | 46 ++--
datafusion/core/src/physical_plan/stream.rs | 16 +-
datafusion/core/src/physical_plan/union.rs | 4 +-
datafusion/core/src/physical_plan/windows/mod.rs | 11 +-
datafusion/core/tests/sql/joins.rs | 161 ++++++++++++++
.../tests/sqllogictests/test_files/options.slt | 84 ++++++++
datafusion/execution/src/task.rs | 44 +++-
22 files changed, 467 insertions(+), 498 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 473d4eb131..be3b868499 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1215,7 +1215,6 @@ fn evaluate_group_by(
#[cfg(test)]
mod tests {
use super::*;
- use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
@@ -1231,7 +1230,6 @@ mod tests {
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
- use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
use crate::test::{assert_is_pending, csv_exec_sorted};
use crate::{assert_batches_eq, assert_batches_sorted_eq,
physical_plan::common};
@@ -1449,8 +1447,7 @@ mod tests {
DataType::Int64,
))];
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
@@ -1556,8 +1553,7 @@ mod tests {
DataType::Float64,
))];
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
@@ -1779,14 +1775,11 @@ mod tests {
Arc::new(TestYieldingExec { yield_first: true });
let input_schema = input.schema();
- let session_ctx = SessionContext::with_config_rt(
- SessionConfig::default(),
- Arc::new(
- RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1,
1.0))
- .unwrap(),
- ),
+ let runtime = Arc::new(
+ RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1,
1.0)).unwrap(),
);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default().with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let groups_none = PhysicalGroupBy::default();
let groups_some = PhysicalGroupBy {
@@ -1864,8 +1857,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel_without_groups() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
@@ -1901,8 +1893,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel_with_groups() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float32, true),
@@ -1970,8 +1961,7 @@ mod tests {
use_coalesce_batches: bool,
is_first_acc: bool,
) -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (schema, data) = some_data_v2();
let partition1 = data[0].clone();
diff --git a/datafusion/core/src/physical_plan/analyze.rs
b/datafusion/core/src/physical_plan/analyze.rs
index bba1c37e6d..d8c87d2f51 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -229,7 +229,6 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;
- use crate::prelude::SessionContext;
use crate::{
physical_plan::collect,
test::{
@@ -242,8 +241,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 994f75ce4c..71ad39658f 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -305,46 +305,10 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
- use crate::config::ConfigOptions;
- use crate::datasource::MemTable;
- use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::{memory::MemoryExec,
repartition::RepartitionExec};
- use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};
- #[tokio::test]
- async fn test_custom_batch_size() -> Result<()> {
- let mut config = ConfigOptions::new();
- config.execution.batch_size = 1234;
-
- let ctx = SessionContext::with_config(config.into());
- let plan = create_physical_plan(ctx).await?;
- let coalesce =
plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
- assert_eq!(1234, coalesce.target_batch_size);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_disable_coalesce() -> Result<()> {
- let mut config = ConfigOptions::new();
- config.execution.coalesce_batches = false;
-
- let ctx = SessionContext::with_config(config.into());
- let plan = create_physical_plan(ctx).await?;
- let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
- Ok(())
- }
-
- async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn
ExecutionPlan>> {
- let schema = test_schema();
- let partition = create_vec_batches(&schema, 10);
- let table = MemTable::try_new(schema, vec![partition])?;
- ctx.register_table("a", Arc::new(table))?;
- let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
- dataframe.create_physical_plan().await
- }
-
#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
let schema = test_schema();
@@ -385,10 +349,9 @@ mod tests {
// execute and collect results
let output_partition_count =
exec.output_partitioning().partition_count();
let mut output_partitions = Vec::with_capacity(output_partition_count);
- let session_ctx = SessionContext::new();
for i in 0..output_partition_count {
// execute this *output* partition and collect all batches
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let mut stream = exec.execute(i, task_ctx.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index bc48b5f5e1..14e8aada6c 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -176,7 +176,6 @@ mod tests {
use super::*;
use crate::physical_plan::{collect, common};
- use crate::prelude::SessionContext;
use crate::test::exec::{
assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
};
@@ -184,8 +183,7 @@ mod tests {
#[tokio::test]
async fn merge() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;
@@ -212,8 +210,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
@@ -235,8 +232,7 @@ mod tests {
#[tokio::test]
#[should_panic(expected = "PanickingStream did panic")]
async fn test_panic() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
diff --git a/datafusion/core/src/physical_plan/empty.rs
b/datafusion/core/src/physical_plan/empty.rs
index 17bfa0af3a..7da4e80c12 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -174,13 +174,11 @@ impl ExecutionPlan for EmptyExec {
mod tests {
use super::*;
use crate::physical_plan::with_new_children_if_necessary;
- use crate::prelude::SessionContext;
use crate::{physical_plan::common, test_util};
#[tokio::test]
async fn empty() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema.clone());
@@ -217,8 +215,7 @@ mod tests {
#[tokio::test]
async fn invalid_execute() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
@@ -230,8 +227,7 @@ mod tests {
#[tokio::test]
async fn produce_one_row() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);
@@ -246,8 +242,7 @@ mod tests {
#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);
diff --git a/datafusion/core/src/physical_plan/filter.rs
b/datafusion/core/src/physical_plan/filter.rs
index e8c181a34b..f9fc4fb4b6 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -381,7 +381,6 @@ mod tests {
use crate::physical_plan::expressions::*;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{collect, with_new_children_if_necessary};
- use crate::prelude::SessionContext;
use crate::test;
use crate::test::exec::StatisticsExec;
use crate::test_util;
@@ -395,8 +394,7 @@ mod tests {
#[tokio::test]
async fn simple_predicate() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let partitions = 4;
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index eaee9892d0..6d74a069b6 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -460,7 +460,6 @@ mod tests {
use crate::assert_batches_sorted_eq;
use crate::common::assert_contains;
use crate::physical_plan::common;
- use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_scan_i32, columns};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -617,8 +616,7 @@ mod tests {
#[tokio::test]
async fn test_join() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table_scan_i32(
("a1", &vec![1, 2, 3]),
@@ -656,9 +654,8 @@ mod tests {
async fn test_overallocation() -> Result<()> {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
- let session_ctx =
- SessionContext::with_config_rt(SessionConfig::default(), runtime);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default().with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let left = build_table_scan_i32(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 2108893ccb..3522656142 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -1402,7 +1402,6 @@ mod tests {
use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
- use crate::prelude::SessionContext;
use crate::{
assert_batches_sorted_eq,
common::assert_contains,
@@ -1540,8 +1539,7 @@ mod tests {
#[tokio::test]
async fn join_inner_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1586,8 +1584,7 @@ mod tests {
#[tokio::test]
async fn partitioned_join_inner_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1631,8 +1628,7 @@ mod tests {
#[tokio::test]
async fn join_inner_one_no_shared_column_names() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1670,8 +1666,7 @@ mod tests {
#[tokio::test]
async fn join_inner_two() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 2]),
("b2", &vec![1, 2, 2]),
@@ -1718,8 +1713,7 @@ mod tests {
/// Test where the left has 2 parts, the right with 1 part => 1 part
#[tokio::test]
async fn join_inner_one_two_parts_left() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let batch1 = build_table_i32(
("a1", &vec![1, 2]),
("b2", &vec![1, 2]),
@@ -1773,8 +1767,7 @@ mod tests {
/// Test where the left has 1 part, the right has 2 parts => 2 parts
#[tokio::test]
async fn join_inner_one_two_parts_right() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1849,8 +1842,7 @@ mod tests {
#[tokio::test]
async fn join_left_multi_batch() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -1891,8 +1883,7 @@ mod tests {
#[tokio::test]
async fn join_full_multi_batch() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -1936,8 +1927,7 @@ mod tests {
#[tokio::test]
async fn join_left_empty_right() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
@@ -1973,8 +1963,7 @@ mod tests {
#[tokio::test]
async fn join_full_empty_right() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
@@ -2010,8 +1999,7 @@ mod tests {
#[tokio::test]
async fn join_left_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2054,8 +2042,7 @@ mod tests {
#[tokio::test]
async fn partitioned_join_left_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2118,8 +2105,7 @@ mod tests {
#[tokio::test]
async fn join_left_semi() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
// left_table left semi join right_table on left_table.b1 =
right_table.b2
@@ -2153,8 +2139,7 @@ mod tests {
#[tokio::test]
async fn join_left_semi_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
@@ -2240,8 +2225,7 @@ mod tests {
#[tokio::test]
async fn join_right_semi() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
@@ -2275,8 +2259,7 @@ mod tests {
#[tokio::test]
async fn join_right_semi_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
@@ -2361,8 +2344,7 @@ mod tests {
#[tokio::test]
async fn join_left_anti() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
// left_table left anti join right_table on left_table.b1 =
right_table.b2
@@ -2395,8 +2377,7 @@ mod tests {
#[tokio::test]
async fn join_left_anti_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
// left_table left anti join right_table on left_table.b1 =
right_table.b2 and right_table.a2!=8
@@ -2489,8 +2470,7 @@ mod tests {
#[tokio::test]
async fn join_right_anti() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
let on = vec![(
@@ -2521,8 +2501,7 @@ mod tests {
#[tokio::test]
async fn join_right_anti_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_semi_anti_left_table();
let right = build_semi_anti_right_table();
// left_table right anti join right_table on left_table.b1 =
right_table.b2 and left_table.a1!=13
@@ -2618,8 +2597,7 @@ mod tests {
#[tokio::test]
async fn join_right_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
@@ -2657,8 +2635,7 @@ mod tests {
#[tokio::test]
async fn partitioned_join_right_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]),
@@ -2697,8 +2674,7 @@ mod tests {
#[tokio::test]
async fn join_full_one() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2800,8 +2776,7 @@ mod tests {
#[tokio::test]
async fn join_with_duplicated_column_names() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![1, 2, 3]),
("b", &vec![4, 5, 7]),
@@ -2865,8 +2840,7 @@ mod tests {
#[tokio::test]
async fn join_inner_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
@@ -2906,8 +2880,7 @@ mod tests {
#[tokio::test]
async fn join_left_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
@@ -2950,8 +2923,7 @@ mod tests {
#[tokio::test]
async fn join_right_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
@@ -2993,8 +2965,7 @@ mod tests {
#[tokio::test]
async fn join_full_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a", &vec![0, 1, 2, 2]),
("b", &vec![4, 5, 7, 8]),
@@ -3062,8 +3033,7 @@ mod tests {
let join = join(left, right, on, &JoinType::Inner, false)?;
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx)?;
let batches = common::collect(stream).await?;
@@ -3122,8 +3092,7 @@ mod tests {
false,
)
.unwrap();
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let stream = join.execute(0, task_ctx).unwrap();
@@ -3170,9 +3139,8 @@ mod tests {
for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
- let session_ctx =
- SessionContext::with_config_rt(SessionConfig::default(),
runtime);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default().with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let join = join(left.clone(), right.clone(), on.clone(),
&join_type, false)?;
@@ -3241,8 +3209,10 @@ mod tests {
let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_config = SessionConfig::default().with_batch_size(50);
- let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default()
+ .with_session_config(session_config)
+ .with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let join = HashJoinExec::try_new(
left.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index cad3b4743b..60fdf452cf 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -744,10 +744,7 @@ mod tests {
use crate::{
assert_batches_sorted_eq,
common::assert_contains,
- execution::{
- context::SessionConfig,
- runtime_env::{RuntimeConfig, RuntimeEnv},
- },
+ execution::runtime_env::{RuntimeConfig, RuntimeEnv},
physical_plan::{
common, expressions::Column, memory::MemoryExec,
repartition::RepartitionExec,
},
@@ -757,7 +754,6 @@ mod tests {
use datafusion_expr::Operator;
use crate::physical_plan::joins::utils::JoinSide;
- use crate::prelude::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::PhysicalExpr;
@@ -884,8 +880,7 @@ mod tests {
#[tokio::test]
async fn join_inner_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
let filter = prepare_join_filter();
@@ -913,8 +908,7 @@ mod tests {
#[tokio::test]
async fn join_left_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -945,8 +939,7 @@ mod tests {
#[tokio::test]
async fn join_right_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -977,8 +970,7 @@ mod tests {
#[tokio::test]
async fn join_full_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -1011,8 +1003,7 @@ mod tests {
#[tokio::test]
async fn join_left_semi_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -1041,8 +1032,7 @@ mod tests {
#[tokio::test]
async fn join_left_anti_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -1072,8 +1062,7 @@ mod tests {
#[tokio::test]
async fn join_right_semi_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -1102,8 +1091,7 @@ mod tests {
#[tokio::test]
async fn join_right_anti_with_filter() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let left = build_left_table();
let right = build_right_table();
@@ -1159,9 +1147,8 @@ mod tests {
for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
- let session_ctx =
- SessionContext::with_config_rt(SessionConfig::default(),
runtime);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default().with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let err = multi_partitioned_join_collect(
left.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index b3721eb4d6..902b022857 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -1389,6 +1389,8 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::TaskContext;
use crate::common::assert_contains;
use crate::physical_plan::expressions::Column;
@@ -1396,7 +1398,6 @@ mod tests {
use crate::physical_plan::joins::SortMergeJoinExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::{common, ExecutionPlan};
- use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
use crate::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_common::JoinType;
@@ -1537,8 +1538,7 @@ mod tests {
sort_options: Vec<SortOptions>,
null_equals_null: bool,
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let join = join_with_options(
left,
right,
@@ -1560,9 +1560,9 @@ mod tests {
on: JoinOn,
join_type: JoinType,
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
- let session_ctx =
-
SessionContext::with_config(SessionConfig::new().with_batch_size(2));
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default()
+ .with_session_config(SessionConfig::new().with_batch_size(2));
+ let task_ctx = Arc::new(task_ctx);
let join = join(left, right, on, join_type)?;
let columns = columns(&join.schema());
@@ -2321,8 +2321,12 @@ mod tests {
let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_config = SessionConfig::default().with_batch_size(50);
- let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
- let task_ctx = session_ctx.task_ctx();
+
+ let task_ctx = TaskContext::default()
+ .with_session_config(session_config)
+ .with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
+
let join = join_with_options(
left.clone(),
right.clone(),
@@ -2397,8 +2401,10 @@ mod tests {
let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_config = SessionConfig::default().with_batch_size(50);
- let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = TaskContext::default()
+ .with_session_config(session_config)
+ .with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
let join = join_with_options(
left.clone(),
right.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index dc8bcc2edb..efe7ce503b 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -1428,21 +1428,17 @@ impl SymmetricHashJoinStream {
#[cfg(test)]
mod tests {
- use std::fs::File;
-
+ use super::*;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
+ use datafusion_execution::config::SessionConfig;
use rstest::*;
- use tempfile::TempDir;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{binary, col, Column};
use
datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numerical_expr;
- use crate::physical_plan::displayable;
use
crate::physical_plan::joins::hash_join_utils::tests::complicated_filter;
- use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
- use crate::test_util::register_unbounded_file_with_ordering;
use crate::physical_plan::joins::test_utils::{
build_sides_record_batches, compare_batches, create_memory_table,
@@ -1451,9 +1447,6 @@ mod tests {
partitioned_sym_join_with_filter,
};
use datafusion_common::ScalarValue;
- use std::iter::Iterator;
-
- use super::*;
const TABLE_SIZE: i32 = 100;
@@ -1506,8 +1499,7 @@ mod tests {
cardinality: (i32, i32),
) -> Result<()> {
// a + b > c + 10 AND a + b < c + 100
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -1587,8 +1579,7 @@ mod tests {
cardinality: (i32, i32),
#[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize,
) -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -1662,8 +1653,7 @@ mod tests {
cardinality: (i32, i32),
#[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize,
) -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -1715,8 +1705,7 @@ mod tests {
)]
join_type: JoinType,
) -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE,
(11, 21))?;
let left_schema = &left_batch.schema();
let right_schema = &right_batch.schema();
@@ -1753,8 +1742,7 @@ mod tests {
cardinality: (i32, i32),
#[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize,
) -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -1811,173 +1799,14 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn join_change_in_planner() -> Result<()> {
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let tmp_dir = TempDir::new().unwrap();
- let left_file_path = tmp_dir.path().join("left.csv");
- File::create(left_file_path.clone()).unwrap();
- // Create schema
- let schema = Arc::new(Schema::new(vec![
- Field::new("a1", DataType::UInt32, false),
- Field::new("a2", DataType::UInt32, false),
- ]));
- // Specify the ordering:
- let file_sort_order = vec![[datafusion_expr::col("a1")]
- .into_iter()
- .map(|e| {
- let ascending = true;
- let nulls_first = false;
- e.sort(ascending, nulls_first)
- })
- .collect::<Vec<_>>()];
- register_unbounded_file_with_ordering(
- &ctx,
- schema.clone(),
- &left_file_path,
- "left",
- file_sort_order.clone(),
- true,
- )
- .await?;
- let right_file_path = tmp_dir.path().join("right.csv");
- File::create(right_file_path.clone()).unwrap();
- register_unbounded_file_with_ordering(
- &ctx,
- schema,
- &right_file_path,
- "right",
- file_sort_order,
- true,
- )
- .await?;
- let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
- let dataframe = ctx.sql(sql).await?;
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
- let expected = {
- [
- "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
- // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
- // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]},
projection=[a1, a2], has_header=false"
- ]
- };
- let mut actual: Vec<&str> = formatted.trim().lines().collect();
- // Remove CSV lines
- actual.remove(3);
- actual.remove(5);
-
- assert_eq!(
- expected,
- actual[..],
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- #[tokio::test]
- async fn join_change_in_planner_without_sort() -> Result<()> {
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let tmp_dir = TempDir::new()?;
- let left_file_path = tmp_dir.path().join("left.csv");
- File::create(left_file_path.clone())?;
- let schema = Arc::new(Schema::new(vec![
- Field::new("a1", DataType::UInt32, false),
- Field::new("a2", DataType::UInt32, false),
- ]));
- ctx.register_csv(
- "left",
- left_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
- let right_file_path = tmp_dir.path().join("right.csv");
- File::create(right_file_path.clone())?;
- ctx.register_csv(
- "right",
- right_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
- let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
- let dataframe = ctx.sql(sql).await?;
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
- let expected = {
- [
- "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
- // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
- // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]},
projection=[a1, a2], has_header=false"
- ]
- };
- let mut actual: Vec<&str> = formatted.trim().lines().collect();
- // Remove CSV lines
- actual.remove(3);
- actual.remove(5);
-
- assert_eq!(
- expected,
- actual[..],
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- #[tokio::test]
- async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
- let config = SessionConfig::new()
- .with_target_partitions(8)
- .with_allow_symmetric_joins_without_pruning(false);
- let ctx = SessionContext::with_config(config);
- let tmp_dir = TempDir::new()?;
- let left_file_path = tmp_dir.path().join("left.csv");
- File::create(left_file_path.clone())?;
- let schema = Arc::new(Schema::new(vec![
- Field::new("a1", DataType::UInt32, false),
- Field::new("a2", DataType::UInt32, false),
- ]));
- ctx.register_csv(
- "left",
- left_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
- let right_file_path = tmp_dir.path().join("right.csv");
- File::create(right_file_path.clone())?;
- ctx.register_csv(
- "right",
- right_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
- let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1
FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1
+ 10").await?;
- match df.create_physical_plan().await {
- Ok(_) => panic!("Expecting error."),
- Err(e) => {
- assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError
during planning: Join operation cannot operate on a non-prunable stream without
enabling the 'allow_symmetric_joins_without_pruning' configuration flag")
- }
- }
- Ok(())
- }
-
#[tokio::test(flavor = "multi_thread")]
async fn build_null_columns_first() -> Result<()> {
let join_type = JoinType::Full;
let cardinality = (10, 11);
let case_expr = 1;
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2038,9 +1867,9 @@ mod tests {
let join_type = JoinType::Full;
let cardinality = (10, 11);
let case_expr = 1;
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2102,9 +1931,9 @@ mod tests {
let join_type = JoinType::Full;
let cardinality = (10, 11);
let case_expr = 1;
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2167,9 +1996,9 @@ mod tests {
let join_type = JoinType::Full;
// a + b > c + 10 AND a + b < c + 100
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2241,9 +2070,9 @@ mod tests {
let join_type = case.0;
let should_be_empty = case.1;
let random_state = RandomState::with_seeds(0, 0, 0, 0);
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
// Ensure there will be matching rows
let (left_batch, right_batch) = build_sides_record_batches(20, (1,
1))?;
let left_schema = left_batch.schema();
@@ -2369,9 +2198,9 @@ mod tests {
cardinality: (i32, i32),
#[values(0, 1)] case_expr: usize,
) -> Result<()> {
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2453,9 +2282,9 @@ mod tests {
)]
cardinality: (i32, i32),
) -> Result<()> {
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
@@ -2532,9 +2361,9 @@ mod tests {
cardinality: (i32, i32),
#[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize,
) -> Result<()> {
- let config = SessionConfig::new().with_repartition_joins(false);
- let session_ctx = SessionContext::with_config(config);
- let task_ctx = session_ctx.task_ctx();
+ let session_config =
SessionConfig::new().with_repartition_joins(false);
+ let task_ctx =
TaskContext::default().with_session_config(session_config);
+ let task_ctx = Arc::new(task_ctx);
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
diff --git a/datafusion/core/src/physical_plan/limit.rs
b/datafusion/core/src/physical_plan/limit.rs
index 93f6cd7c2c..c5d5fbcfd1 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -532,13 +532,11 @@ mod tests {
use super::*;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
- use crate::prelude::SessionContext;
use crate::test;
#[tokio::test]
async fn limit() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;
@@ -654,8 +652,7 @@ mod tests {
// test cases for "skip"
async fn skip_and_fetch(skip: usize, fetch: Option<usize>) ->
Result<usize> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index 5c4b661143..ca4a3e54f5 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -427,7 +427,6 @@ mod tests {
use super::*;
use crate::physical_plan::common::collect;
use crate::physical_plan::expressions::{self, col};
- use crate::prelude::SessionContext;
use crate::test::{self};
use crate::test_util;
use datafusion_common::ScalarValue;
@@ -448,8 +447,7 @@ mod tests {
#[tokio::test]
async fn project_first_column() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let partitions = 4;
@@ -519,8 +517,7 @@ mod tests {
#[tokio::test]
async fn project_no_column() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let csv = test::scan_partitioned_csv(1)?;
let expected = collect(csv.execute(0,
task_ctx.clone())?).await.unwrap();
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 3f83e186ea..78d78df0af 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -883,8 +883,6 @@ impl RecordBatchStream for PerPartitionStream {
#[cfg(test)]
mod tests {
use super::*;
- use crate::execution::context::SessionConfig;
- use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
use crate::{
assert_batches_sorted_eq,
@@ -998,8 +996,7 @@ mod tests {
input_partitions: Vec<Vec<RecordBatch>>,
partitioning: Partitioning,
) -> Result<Vec<Vec<RecordBatch>>> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, schema.clone(),
None)?;
let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;
@@ -1046,8 +1043,7 @@ mod tests {
#[tokio::test]
async fn unsupported_partitioning() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
// have to send at least one batch through to provoke error
let batch = RecordBatch::try_from_iter(vec![(
"my_awesome_field",
@@ -1081,8 +1077,7 @@ mod tests {
// This generates an error on a call to execute. The error
// should be returned and no results produced.
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let input = ErrorExec::new();
let partitioning = Partitioning::RoundRobinBatch(1);
let exec = RepartitionExec::try_new(Arc::new(input),
partitioning).unwrap();
@@ -1104,8 +1099,7 @@ mod tests {
#[tokio::test]
async fn repartition_with_error_in_stream() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let batch = RecordBatch::try_from_iter(vec![(
"my_awesome_field",
Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
@@ -1138,8 +1132,7 @@ mod tests {
#[tokio::test]
async fn repartition_with_delayed_stream() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let batch1 = RecordBatch::try_from_iter(vec![(
"my_awesome_field",
Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
@@ -1184,8 +1177,7 @@ mod tests {
#[tokio::test]
async fn robin_repartition_with_dropping_output_stream() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partitioning = Partitioning::RoundRobinBatch(2);
// The barrier exec waits to be pinged
// requires the input to wait at least once)
@@ -1228,8 +1220,7 @@ mod tests {
// wiht different compilers, we will compare the same execution with
// and without droping the output stream.
async fn hash_repartition_with_dropping_output_stream() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partitioning = Partitioning::Hash(
vec![Arc::new(crate::physical_plan::expressions::Column::new(
"my_awesome_field",
@@ -1324,8 +1315,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
@@ -1348,8 +1338,7 @@ mod tests {
#[tokio::test]
async fn hash_repartition_avoid_empty_batch() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let batch = RecordBatch::try_from_iter(vec![(
"a",
Arc::new(StringArray::from(vec!["foo"])) as ArrayRef,
@@ -1385,14 +1374,12 @@ mod tests {
let partitioning = Partitioning::RoundRobinBatch(4);
// setup up context
- let session_ctx = SessionContext::with_config_rt(
- SessionConfig::default(),
- Arc::new(
- RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1,
1.0))
- .unwrap(),
- ),
+ let runtime = Arc::new(
+ RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1,
1.0)).unwrap(),
);
- let task_ctx = session_ctx.task_ctx();
+
+ let task_ctx = TaskContext::default().with_runtime(runtime);
+ let task_ctx = Arc::new(task_ctx);
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, schema.clone(),
None)?;
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 1813fd6a15..3bedbd17e8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -884,12 +884,10 @@ impl ExecutionPlan for SortExec {
#[cfg(test)]
mod tests {
use super::*;
- use crate::execution::context::SessionConfig;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::collect;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
- use crate::prelude::SessionContext;
use crate::test;
use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
@@ -897,14 +895,14 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::{as_primitive_array, as_string_array};
+ use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeConfig;
use futures::FutureExt;
use std::collections::HashMap;
#[tokio::test]
async fn test_in_mem_sort() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
@@ -930,7 +928,7 @@ mod tests {
Arc::new(CoalescePartitionsExec::new(csv)),
));
- let result = collect(sort_exec, task_ctx).await?;
+ let result = collect(sort_exec, task_ctx.clone()).await?;
assert_eq!(result.len(), 1);
@@ -949,7 +947,7 @@ mod tests {
assert_eq!(c7.value(c7.len() - 1), 254,);
assert_eq!(
- session_ctx.runtime_env().memory_pool.reserved(),
+ task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory
manager"
);
@@ -968,7 +966,11 @@ mod tests {
let rt_config = RuntimeConfig::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0);
let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
- let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
+ let task_ctx = Arc::new(
+ TaskContext::default()
+ .with_session_config(session_config)
+ .with_runtime(runtime),
+ );
let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;
@@ -995,8 +997,7 @@ mod tests {
Arc::new(CoalescePartitionsExec::new(csv)),
));
- let task_ctx = session_ctx.task_ctx();
- let result = collect(sort_exec.clone(), task_ctx).await?;
+ let result = collect(sort_exec.clone(), task_ctx.clone()).await?;
assert_eq!(result.len(), 1);
@@ -1023,7 +1024,7 @@ mod tests {
assert_eq!(c7.value(c7.len() - 1), 254,);
assert_eq!(
- session_ctx.runtime_env().memory_pool.reserved(),
+ task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory
manager"
);
@@ -1059,7 +1060,11 @@ mod tests {
1.0,
);
let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
- let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
+ let task_ctx = Arc::new(
+ TaskContext::default()
+ .with_runtime(runtime)
+ .with_session_config(session_config),
+ );
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
@@ -1088,7 +1093,6 @@ mod tests {
.with_fetch(fetch),
);
- let task_ctx = session_ctx.task_ctx();
let result = collect(sort_exec.clone(), task_ctx).await?;
assert_eq!(result.len(), 1);
@@ -1101,8 +1105,7 @@ mod tests {
#[tokio::test]
async fn test_sort_metadata() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let field_metadata: HashMap<String, String> =
vec![("foo".to_string(), "bar".to_string())]
.into_iter()
@@ -1151,8 +1154,7 @@ mod tests {
#[tokio::test]
async fn test_lex_sort_by_float() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float64, true),
@@ -1257,8 +1259,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
@@ -1272,7 +1273,7 @@ mod tests {
blocking_exec,
));
- let fut = collect(sort_exec, task_ctx);
+ let fut = collect(sort_exec, task_ctx.clone());
let mut fut = fut.boxed();
assert_is_pending(&mut fut);
@@ -1280,7 +1281,7 @@ mod tests {
assert_strong_count_converges_to_zero(refs).await;
assert_eq!(
- session_ctx.runtime_env().memory_pool.reserved(),
+ task_ctx.runtime_env().memory_pool.reserved(),
0,
"The sort should have returned all memory used back to the memory
manager"
);
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 6b978b5ee7..bc0ac678f0 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -273,6 +273,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+ use datafusion_execution::config::SessionConfig;
use futures::{FutureExt, StreamExt};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -282,7 +283,6 @@ mod tests {
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{collect, common};
- use crate::prelude::{SessionConfig, SessionContext};
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::{assert_batches_eq, test_util};
@@ -292,8 +292,7 @@ mod tests {
#[tokio::test]
async fn test_merge_interleave() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
@@ -341,8 +340,7 @@ mod tests {
#[tokio::test]
async fn test_merge_some_overlap() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
@@ -390,8 +388,7 @@ mod tests {
#[tokio::test]
async fn test_merge_no_overlap() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
@@ -439,8 +436,7 @@ mod tests {
#[tokio::test]
async fn test_merge_three_partitions() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
Some("a"),
@@ -561,8 +557,7 @@ mod tests {
#[tokio::test]
async fn test_partition_sort() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let partitions = 4;
let csv = test::scan_partitioned_csv(partitions).unwrap();
let schema = csv.schema();
@@ -644,8 +639,7 @@ mod tests {
#[tokio::test]
async fn test_partition_sort_streaming_input() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let sort = vec![
// uint8
@@ -705,17 +699,18 @@ mod tests {
},
];
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ // Test streaming with default batch size
+ let task_ctx = Arc::new(TaskContext::default());
let input =
sorted_partitioned_input(sort.clone(), &[10, 5, 13],
task_ctx.clone()).await;
let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await;
- let session_ctx_bs_23 =
-
SessionContext::with_config(SessionConfig::new().with_batch_size(23));
+ // batch size of 23
+ let task_ctx = TaskContext::default()
+ .with_session_config(SessionConfig::new().with_batch_size(23));
+ let task_ctx = Arc::new(task_ctx);
let merge = Arc::new(SortPreservingMergeExec::new(sort, input));
- let task_ctx = session_ctx_bs_23.task_ctx();
let merged = collect(merge, task_ctx).await.unwrap();
assert_eq!(merged.len(), 14);
@@ -735,8 +730,7 @@ mod tests {
#[tokio::test]
async fn test_nulls() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
None,
@@ -817,8 +811,7 @@ mod tests {
#[tokio::test]
async fn test_async() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
@@ -885,8 +878,7 @@ mod tests {
#[tokio::test]
async fn test_merge_metrics() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"),
Some("c")]));
let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
@@ -942,8 +934,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
@@ -969,8 +960,7 @@ mod tests {
#[tokio::test]
async fn test_stable_sort() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
// Create record batches like:
// batch_number |value
diff --git a/datafusion/core/src/physical_plan/stream.rs
b/datafusion/core/src/physical_plan/stream.rs
index 2b916b7ee2..2e9d40cfe5 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -370,11 +370,8 @@ mod test {
use super::*;
use arrow_schema::{DataType, Field, Schema};
- use crate::{
- execution::context::SessionContext,
- test::exec::{
- assert_strong_count_converges_to_zero, BlockingExec, MockExec,
PanicExec,
- },
+ use crate::test::exec::{
+ assert_strong_count_converges_to_zero, BlockingExec, MockExec,
PanicExec,
};
fn schema() -> SchemaRef {
@@ -413,8 +410,7 @@ mod test {
#[tokio::test]
async fn record_batch_receiver_stream_drop_cancel() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = schema();
// Make an input that never proceeds
@@ -439,8 +435,7 @@ mod test {
/// `RecordBatchReceiverStream` stops early and does not drive
/// other streams to completion.
async fn record_batch_receiver_stream_error_does_not_drive_completion() {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema = schema();
// make an input that will error twice
@@ -471,8 +466,7 @@ mod test {
///
/// panic's if more than max_batches is seen,
async fn consume(input: PanicExec, max_batches: usize) {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let input = Arc::new(input);
let num_partitions = input.output_partitioning().partition_count();
diff --git a/datafusion/core/src/physical_plan/union.rs
b/datafusion/core/src/physical_plan/union.rs
index e1f8072085..e29c96da09 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -608,14 +608,12 @@ mod tests {
use super::*;
use crate::test;
- use crate::prelude::SessionContext;
use crate::{physical_plan::collect, scalar::ScalarValue};
use arrow::record_batch::RecordBatch;
#[tokio::test]
async fn test_union_partitions() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
// Create csv's with different partitioning
let csv = test::scan_partitioned_csv(4)?;
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index 7d9d70f477..cba7df772e 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -362,7 +362,6 @@ mod tests {
use crate::physical_plan::aggregates::AggregateFunction;
use crate::physical_plan::expressions::col;
use crate::physical_plan::{collect, ExecutionPlan};
- use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
use crate::test::{self, assert_is_pending, csv_exec_sorted};
use arrow::array::*;
@@ -370,6 +369,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_primitive_array;
+ use datafusion_execution::TaskContext;
use datafusion_expr::{create_udaf, Accumulator, Volatility};
use futures::FutureExt;
@@ -546,8 +546,7 @@ mod tests {
Arc::new(vec![DataType::Int64]),
);
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (input, schema) = create_test_schema(1)?;
let window_exec = Arc::new(WindowAggExec::try_new(
@@ -579,8 +578,7 @@ mod tests {
#[tokio::test]
async fn window_function() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let (input, schema) = create_test_schema(1)?;
let window_exec = Arc::new(WindowAggExec::try_new(
@@ -643,8 +641,7 @@ mod tests {
#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
+ let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
diff --git a/datafusion/core/tests/sql/joins.rs
b/datafusion/core/tests/sql/joins.rs
index 116a3c1a79..e555a28f41 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::test_util::register_unbounded_file_with_ordering;
+
use super::*;
#[tokio::test]
@@ -75,3 +77,162 @@ async fn null_aware_left_anti_join() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn join_change_in_planner() -> Result<()> {
+ let config = SessionConfig::new().with_target_partitions(8);
+ let ctx = SessionContext::with_config(config);
+ let tmp_dir = TempDir::new().unwrap();
+ let left_file_path = tmp_dir.path().join("left.csv");
+ File::create(left_file_path.clone()).unwrap();
+ // Create schema
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::UInt32, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+ // Specify the ordering:
+ let file_sort_order = vec![[datafusion_expr::col("a1")]
+ .into_iter()
+ .map(|e| {
+ let ascending = true;
+ let nulls_first = false;
+ e.sort(ascending, nulls_first)
+ })
+ .collect::<Vec<_>>()];
+ register_unbounded_file_with_ordering(
+ &ctx,
+ schema.clone(),
+ &left_file_path,
+ "left",
+ file_sort_order.clone(),
+ true,
+ )
+ .await?;
+ let right_file_path = tmp_dir.path().join("right.csv");
+ File::create(right_file_path.clone()).unwrap();
+ register_unbounded_file_with_ordering(
+ &ctx,
+ schema,
+ &right_file_path,
+ "right",
+ file_sort_order,
+ true,
+ )
+ .await?;
+ let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
+ let expected = {
+ [
+ "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
+ // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
+ // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]},
projection=[a1, a2], has_header=false"
+ ]
+ };
+ let mut actual: Vec<&str> = formatted.trim().lines().collect();
+ // Remove CSV lines
+ actual.remove(3);
+ actual.remove(5);
+
+ assert_eq!(
+ expected,
+ actual[..],
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_change_in_planner_without_sort() -> Result<()> {
+ let config = SessionConfig::new().with_target_partitions(8);
+ let ctx = SessionContext::with_config(config);
+ let tmp_dir = TempDir::new()?;
+ let left_file_path = tmp_dir.path().join("left.csv");
+ File::create(left_file_path.clone())?;
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::UInt32, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+ ctx.register_csv(
+ "left",
+ left_file_path.as_os_str().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).mark_infinite(true),
+ )
+ .await?;
+ let right_file_path = tmp_dir.path().join("right.csv");
+ File::create(right_file_path.clone())?;
+ ctx.register_csv(
+ "right",
+ right_file_path.as_os_str().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).mark_infinite(true),
+ )
+ .await?;
+ let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
+ let expected = {
+ [
+ "SymmetricHashJoinExec: mode=Partitioned, join_type=Full,
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
+ // " CsvExec: file_groups={1 group: [[tempdir/left.csv]]},
projection=[a1, a2], has_header=false",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([a2@1], 8),
input_partitions=1",
+ // " CsvExec: file_groups={1 group: [[tempdir/right.csv]]},
projection=[a1, a2], has_header=false"
+ ]
+ };
+ let mut actual: Vec<&str> = formatted.trim().lines().collect();
+ // Remove CSV lines
+ actual.remove(3);
+ actual.remove(5);
+
+ assert_eq!(
+ expected,
+ actual[..],
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(8)
+ .with_allow_symmetric_joins_without_pruning(false);
+ let ctx = SessionContext::with_config(config);
+ let tmp_dir = TempDir::new()?;
+ let left_file_path = tmp_dir.path().join("left.csv");
+ File::create(left_file_path.clone())?;
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::UInt32, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+ ctx.register_csv(
+ "left",
+ left_file_path.as_os_str().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).mark_infinite(true),
+ )
+ .await?;
+ let right_file_path = tmp_dir.path().join("right.csv");
+ File::create(right_file_path.clone())?;
+ ctx.register_csv(
+ "right",
+ right_file_path.as_os_str().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).mark_infinite(true),
+ )
+ .await?;
+ let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL
JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 +
10").await?;
+ match df.create_physical_plan().await {
+ Ok(_) => panic!("Expecting error."),
+ Err(e) => {
+ assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError
during planning: Join operation cannot operate on a non-prunable stream without
enabling the 'allow_symmetric_joins_without_pruning' configuration flag")
+ }
+ }
+ Ok(())
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/options.slt
b/datafusion/core/tests/sqllogictests/test_files/options.slt
new file mode 100644
index 0000000000..1f4cc9ab0c
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/options.slt
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######
+## Tests for config options
+#######
+
+
+statement ok
+create table a(c0 int) as values (1), (2);
+
+# Expect coalesce and default batch size
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: c0@0 < 1
+----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+##
+# test_disable_coalesce
+##
+
+statement ok
+set datafusion.execution.coalesce_batches = false
+
+# expect no coalsece
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+FilterExec: c0@0 < 1
+--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+statement ok
+set datafusion.execution.coalesce_batches = true
+
+
+##
+# test_custom_batch_size
+##
+
+statement ok
+set datafusion.execution.batch_size = 1234;
+
+# expect batch size to be 1234
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+CoalesceBatchesExec: target_batch_size=1234
+--FilterExec: c0@0 < 1
+----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+
+statement ok
+set datafusion.execution.batch_size = 8192;
+
+statement ok
+drop table a
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 62e2fa8da3..72d804d7bb 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -27,16 +27,20 @@ use datafusion_common::{
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use crate::{
- config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
- runtime_env::RuntimeEnv,
+ config::SessionConfig,
+ memory_pool::MemoryPool,
+ registry::FunctionRegistry,
+ runtime_env::{RuntimeConfig, RuntimeEnv},
};
/// Task Execution Context
///
-/// A [`TaskContext`] has represents the state available during a single
query's
-/// execution.
+/// A [`TaskContext`] contains the state available during a single
+/// query's execution. Please see [`SessionContext`] for a user level
+/// multi-query API.
///
-/// # Task Context
+/// [`SessionContext`]:
https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
+#[derive(Debug)]
pub struct TaskContext {
/// Session Id
session_id: String,
@@ -54,6 +58,24 @@ pub struct TaskContext {
runtime: Arc<RuntimeEnv>,
}
+impl Default for TaskContext {
+ fn default() -> Self {
+ let runtime = RuntimeEnv::new(RuntimeConfig::new())
+ .expect("defauly runtime created successfully");
+
+ // Create a default task context, mostly useful for testing
+ Self {
+ session_id: "DEFAULT".to_string(),
+ task_id: None,
+ session_config: SessionConfig::new(),
+ scalar_functions: HashMap::new(),
+ aggregate_functions: HashMap::new(),
+ window_functions: HashMap::new(),
+ runtime: Arc::new(runtime),
+ }
+ }
+}
+
impl TaskContext {
/// Create a new [`TaskContext`] instance.
///
@@ -137,6 +159,18 @@ impl TaskContext {
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.runtime.clone()
}
+
+ /// Update the [`ConfigOptions`]
+ pub fn with_session_config(mut self, session_config: SessionConfig) ->
Self {
+ self.session_config = session_config;
+ self
+ }
+
+ /// Update the [`RuntimeEnv`]
+ pub fn with_runtime(mut self, runtime: Arc<RuntimeEnv>) -> Self {
+ self.runtime = runtime;
+ self
+ }
}
impl FunctionRegistry for TaskContext {