This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a08ce40050 Update `physical_plan` tests to not use SessionContext 
(#7243)
a08ce40050 is described below

commit a08ce400502792e91c5e34fe8df1cd479e56a31b
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 16:17:58 2023 -0500

    Update `physical_plan` tests to not use SessionContext (#7243)
    
    * Update `physical_plan` tests to not use SessionContext
    
    * fix
---
 .../core/src/physical_plan/aggregates/mod.rs       |  28 +--
 datafusion/core/src/physical_plan/analyze.rs       |   4 +-
 .../core/src/physical_plan/coalesce_batches.rs     |  39 +---
 .../core/src/physical_plan/coalesce_partitions.rs  |  10 +-
 datafusion/core/src/physical_plan/empty.rs         |  13 +-
 datafusion/core/src/physical_plan/filter.rs        |   4 +-
 .../core/src/physical_plan/joins/cross_join.rs     |   9 +-
 .../core/src/physical_plan/joins/hash_join.rs      | 102 ++++-----
 .../src/physical_plan/joins/nested_loop_join.rs    |  35 +---
 .../src/physical_plan/joins/sort_merge_join.rs     |  26 ++-
 .../src/physical_plan/joins/symmetric_hash_join.rs | 233 +++------------------
 datafusion/core/src/physical_plan/limit.rs         |   7 +-
 datafusion/core/src/physical_plan/projection.rs    |   7 +-
 .../core/src/physical_plan/repartition/mod.rs      |  41 ++--
 datafusion/core/src/physical_plan/sorts/sort.rs    |  41 ++--
 .../physical_plan/sorts/sort_preserving_merge.rs   |  46 ++--
 datafusion/core/src/physical_plan/stream.rs        |  16 +-
 datafusion/core/src/physical_plan/union.rs         |   4 +-
 datafusion/core/src/physical_plan/windows/mod.rs   |  11 +-
 datafusion/core/tests/sql/joins.rs                 | 161 ++++++++++++++
 .../tests/sqllogictests/test_files/options.slt     |  84 ++++++++
 datafusion/execution/src/task.rs                   |  44 +++-
 22 files changed, 467 insertions(+), 498 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 473d4eb131..be3b868499 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -1215,7 +1215,6 @@ fn evaluate_group_by(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::execution::context::SessionConfig;
     use crate::physical_plan::aggregates::GroupByOrderMode::{
         FullyOrdered, PartiallyOrdered,
     };
@@ -1231,7 +1230,6 @@ mod tests {
         DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
         SendableRecordBatchStream, Statistics,
     };
-    use crate::prelude::SessionContext;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{assert_is_pending, csv_exec_sorted};
     use crate::{assert_batches_eq, assert_batches_sorted_eq, 
physical_plan::common};
@@ -1449,8 +1447,7 @@ mod tests {
             DataType::Int64,
         ))];
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let partial_aggregate = Arc::new(AggregateExec::try_new(
             AggregateMode::Partial,
@@ -1556,8 +1553,7 @@ mod tests {
             DataType::Float64,
         ))];
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let partial_aggregate = Arc::new(AggregateExec::try_new(
             AggregateMode::Partial,
@@ -1779,14 +1775,11 @@ mod tests {
             Arc::new(TestYieldingExec { yield_first: true });
         let input_schema = input.schema();
 
-        let session_ctx = SessionContext::with_config_rt(
-            SessionConfig::default(),
-            Arc::new(
-                RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 
1.0))
-                    .unwrap(),
-            ),
+        let runtime = Arc::new(
+            RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 
1.0)).unwrap(),
         );
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = TaskContext::default().with_runtime(runtime);
+        let task_ctx = Arc::new(task_ctx);
 
         let groups_none = PhysicalGroupBy::default();
         let groups_some = PhysicalGroupBy {
@@ -1864,8 +1857,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel_without_groups() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
@@ -1901,8 +1893,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel_with_groups() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Float32, true),
             Field::new("b", DataType::Float32, true),
@@ -1970,8 +1961,7 @@ mod tests {
         use_coalesce_batches: bool,
         is_first_acc: bool,
     ) -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let (schema, data) = some_data_v2();
         let partition1 = data[0].clone();
diff --git a/datafusion/core/src/physical_plan/analyze.rs 
b/datafusion/core/src/physical_plan/analyze.rs
index bba1c37e6d..d8c87d2f51 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -229,7 +229,6 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use futures::FutureExt;
 
-    use crate::prelude::SessionContext;
     use crate::{
         physical_plan::collect,
         test::{
@@ -242,8 +241,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs 
b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 994f75ce4c..71ad39658f 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -305,46 +305,10 @@ pub fn concat_batches(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::config::ConfigOptions;
-    use crate::datasource::MemTable;
-    use crate::physical_plan::filter::FilterExec;
     use crate::physical_plan::{memory::MemoryExec, 
repartition::RepartitionExec};
-    use crate::prelude::SessionContext;
     use crate::test::create_vec_batches;
     use arrow::datatypes::{DataType, Field, Schema};
 
-    #[tokio::test]
-    async fn test_custom_batch_size() -> Result<()> {
-        let mut config = ConfigOptions::new();
-        config.execution.batch_size = 1234;
-
-        let ctx = SessionContext::with_config(config.into());
-        let plan = create_physical_plan(ctx).await?;
-        let coalesce = 
plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
-        assert_eq!(1234, coalesce.target_batch_size);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_disable_coalesce() -> Result<()> {
-        let mut config = ConfigOptions::new();
-        config.execution.coalesce_batches = false;
-
-        let ctx = SessionContext::with_config(config.into());
-        let plan = create_physical_plan(ctx).await?;
-        let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
-        Ok(())
-    }
-
-    async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn 
ExecutionPlan>> {
-        let schema = test_schema();
-        let partition = create_vec_batches(&schema, 10);
-        let table = MemTable::try_new(schema, vec![partition])?;
-        ctx.register_table("a", Arc::new(table))?;
-        let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
-        dataframe.create_physical_plan().await
-    }
-
     #[tokio::test(flavor = "multi_thread")]
     async fn test_concat_batches() -> Result<()> {
         let schema = test_schema();
@@ -385,10 +349,9 @@ mod tests {
         // execute and collect results
         let output_partition_count = 
exec.output_partitioning().partition_count();
         let mut output_partitions = Vec::with_capacity(output_partition_count);
-        let session_ctx = SessionContext::new();
         for i in 0..output_partition_count {
             // execute this *output* partition and collect all batches
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = Arc::new(TaskContext::default());
             let mut stream = exec.execute(i, task_ctx.clone())?;
             let mut batches = vec![];
             while let Some(result) = stream.next().await {
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs 
b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index bc48b5f5e1..14e8aada6c 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -176,7 +176,6 @@ mod tests {
 
     use super::*;
     use crate::physical_plan::{collect, common};
-    use crate::prelude::SessionContext;
     use crate::test::exec::{
         assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
     };
@@ -184,8 +183,7 @@ mod tests {
 
     #[tokio::test]
     async fn merge() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
         let csv = test::scan_partitioned_csv(num_partitions)?;
@@ -212,8 +210,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
@@ -235,8 +232,7 @@ mod tests {
     #[tokio::test]
     #[should_panic(expected = "PanickingStream did panic")]
     async fn test_panic() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
diff --git a/datafusion/core/src/physical_plan/empty.rs 
b/datafusion/core/src/physical_plan/empty.rs
index 17bfa0af3a..7da4e80c12 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -174,13 +174,11 @@ impl ExecutionPlan for EmptyExec {
 mod tests {
     use super::*;
     use crate::physical_plan::with_new_children_if_necessary;
-    use crate::prelude::SessionContext;
     use crate::{physical_plan::common, test_util};
 
     #[tokio::test]
     async fn empty() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
 
         let empty = EmptyExec::new(false, schema.clone());
@@ -217,8 +215,7 @@ mod tests {
 
     #[tokio::test]
     async fn invalid_execute() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(false, schema);
 
@@ -230,8 +227,7 @@ mod tests {
 
     #[tokio::test]
     async fn produce_one_row() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(true, schema);
 
@@ -246,8 +242,7 @@ mod tests {
 
     #[tokio::test]
     async fn produce_one_row_multiple_partition() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let partitions = 3;
         let empty = EmptyExec::new(true, schema).with_partitions(partitions);
diff --git a/datafusion/core/src/physical_plan/filter.rs 
b/datafusion/core/src/physical_plan/filter.rs
index e8c181a34b..f9fc4fb4b6 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -381,7 +381,6 @@ mod tests {
     use crate::physical_plan::expressions::*;
     use crate::physical_plan::ExecutionPlan;
     use crate::physical_plan::{collect, with_new_children_if_necessary};
-    use crate::prelude::SessionContext;
     use crate::test;
     use crate::test::exec::StatisticsExec;
     use crate::test_util;
@@ -395,8 +394,7 @@ mod tests {
 
     #[tokio::test]
     async fn simple_predicate() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs 
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index eaee9892d0..6d74a069b6 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -460,7 +460,6 @@ mod tests {
     use crate::assert_batches_sorted_eq;
     use crate::common::assert_contains;
     use crate::physical_plan::common;
-    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::{build_table_scan_i32, columns};
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 
@@ -617,8 +616,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_join() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let left = build_table_scan_i32(
             ("a1", &vec![1, 2, 3]),
@@ -656,9 +654,8 @@ mod tests {
     async fn test_overallocation() -> Result<()> {
         let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
         let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
-        let session_ctx =
-            SessionContext::with_config_rt(SessionConfig::default(), runtime);
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = TaskContext::default().with_runtime(runtime);
+        let task_ctx = Arc::new(task_ctx);
 
         let left = build_table_scan_i32(
             ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 2108893ccb..3522656142 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -1402,7 +1402,6 @@ mod tests {
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
-    use crate::prelude::SessionContext;
     use crate::{
         assert_batches_sorted_eq,
         common::assert_contains,
@@ -1540,8 +1539,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_inner_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1586,8 +1584,7 @@ mod tests {
 
     #[tokio::test]
     async fn partitioned_join_inner_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1631,8 +1628,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_inner_one_no_shared_column_names() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1670,8 +1666,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_inner_two() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 2]),
             ("b2", &vec![1, 2, 2]),
@@ -1718,8 +1713,7 @@ mod tests {
     /// Test where the left has 2 parts, the right with 1 part => 1 part
     #[tokio::test]
     async fn join_inner_one_two_parts_left() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let batch1 = build_table_i32(
             ("a1", &vec![1, 2]),
             ("b2", &vec![1, 2]),
@@ -1773,8 +1767,7 @@ mod tests {
     /// Test where the left has 1 part, the right has 2 parts => 2 parts
     #[tokio::test]
     async fn join_inner_one_two_parts_right() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -1849,8 +1842,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_multi_batch() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -1891,8 +1883,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_full_multi_batch() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -1936,8 +1927,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_empty_right() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]),
@@ -1973,8 +1963,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_full_empty_right() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]),
@@ -2010,8 +1999,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2054,8 +2042,7 @@ mod tests {
 
     #[tokio::test]
     async fn partitioned_join_left_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2118,8 +2105,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_semi() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
         // left_table left semi join right_table on left_table.b1 = 
right_table.b2
@@ -2153,8 +2139,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_semi_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
 
@@ -2240,8 +2225,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_semi() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
 
@@ -2275,8 +2259,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_semi_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
 
@@ -2361,8 +2344,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_anti() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
         // left_table left anti join right_table on left_table.b1 = 
right_table.b2
@@ -2395,8 +2377,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_anti_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
         // left_table left anti join right_table on left_table.b1 = 
right_table.b2 and right_table.a2!=8
@@ -2489,8 +2470,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_anti() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
         let on = vec![(
@@ -2521,8 +2501,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_anti_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_semi_anti_left_table();
         let right = build_semi_anti_right_table();
         // left_table right anti join right_table on left_table.b1 = 
right_table.b2 and left_table.a1!=13
@@ -2618,8 +2597,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]),
@@ -2657,8 +2635,7 @@ mod tests {
 
     #[tokio::test]
     async fn partitioned_join_right_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]),
@@ -2697,8 +2674,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_full_one() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
@@ -2800,8 +2776,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_with_duplicated_column_names() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a", &vec![1, 2, 3]),
             ("b", &vec![4, 5, 7]),
@@ -2865,8 +2840,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_inner_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a", &vec![0, 1, 2, 2]),
             ("b", &vec![4, 5, 7, 8]),
@@ -2906,8 +2880,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a", &vec![0, 1, 2, 2]),
             ("b", &vec![4, 5, 7, 8]),
@@ -2950,8 +2923,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a", &vec![0, 1, 2, 2]),
             ("b", &vec![4, 5, 7, 8]),
@@ -2993,8 +2965,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_full_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_table(
             ("a", &vec![0, 1, 2, 2]),
             ("b", &vec![4, 5, 7, 8]),
@@ -3062,8 +3033,7 @@ mod tests {
 
         let join = join(left, right, on, &JoinType::Inner, false)?;
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let stream = join.execute(0, task_ctx)?;
         let batches = common::collect(stream).await?;
 
@@ -3122,8 +3092,7 @@ mod tests {
                 false,
             )
             .unwrap();
-            let session_ctx = SessionContext::new();
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = Arc::new(TaskContext::default());
 
             let stream = join.execute(0, task_ctx).unwrap();
 
@@ -3170,9 +3139,8 @@ mod tests {
         for join_type in join_types {
             let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
             let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
-            let session_ctx =
-                SessionContext::with_config_rt(SessionConfig::default(), 
runtime);
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = TaskContext::default().with_runtime(runtime);
+            let task_ctx = Arc::new(task_ctx);
 
             let join = join(left.clone(), right.clone(), on.clone(), 
&join_type, false)?;
 
@@ -3241,8 +3209,10 @@ mod tests {
             let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
             let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
             let session_config = SessionConfig::default().with_batch_size(50);
-            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = TaskContext::default()
+                .with_session_config(session_config)
+                .with_runtime(runtime);
+            let task_ctx = Arc::new(task_ctx);
 
             let join = HashJoinExec::try_new(
                 left.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs 
b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index cad3b4743b..60fdf452cf 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -744,10 +744,7 @@ mod tests {
     use crate::{
         assert_batches_sorted_eq,
         common::assert_contains,
-        execution::{
-            context::SessionConfig,
-            runtime_env::{RuntimeConfig, RuntimeEnv},
-        },
+        execution::runtime_env::{RuntimeConfig, RuntimeEnv},
         physical_plan::{
             common, expressions::Column, memory::MemoryExec, 
repartition::RepartitionExec,
         },
@@ -757,7 +754,6 @@ mod tests {
     use datafusion_expr::Operator;
 
     use crate::physical_plan::joins::utils::JoinSide;
-    use crate::prelude::SessionContext;
     use datafusion_common::ScalarValue;
     use datafusion_physical_expr::expressions::Literal;
     use datafusion_physical_expr::PhysicalExpr;
@@ -884,8 +880,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_inner_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
         let filter = prepare_join_filter();
@@ -913,8 +908,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -945,8 +939,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -977,8 +970,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_full_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -1011,8 +1003,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_semi_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -1041,8 +1032,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_left_anti_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -1072,8 +1062,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_semi_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -1102,8 +1091,7 @@ mod tests {
 
     #[tokio::test]
     async fn join_right_anti_with_filter() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let left = build_left_table();
         let right = build_right_table();
 
@@ -1159,9 +1147,8 @@ mod tests {
         for join_type in join_types {
             let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
             let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
-            let session_ctx =
-                SessionContext::with_config_rt(SessionConfig::default(), 
runtime);
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = TaskContext::default().with_runtime(runtime);
+            let task_ctx = Arc::new(task_ctx);
 
             let err = multi_partitioned_join_collect(
                 left.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index b3721eb4d6..902b022857 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -1389,6 +1389,8 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::TaskContext;
 
     use crate::common::assert_contains;
     use crate::physical_plan::expressions::Column;
@@ -1396,7 +1398,6 @@ mod tests {
     use crate::physical_plan::joins::SortMergeJoinExec;
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::{common, ExecutionPlan};
-    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::{build_table_i32, columns};
     use crate::{assert_batches_eq, assert_batches_sorted_eq};
     use datafusion_common::JoinType;
@@ -1537,8 +1538,7 @@ mod tests {
         sort_options: Vec<SortOptions>,
         null_equals_null: bool,
     ) -> Result<(Vec<String>, Vec<RecordBatch>)> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let join = join_with_options(
             left,
             right,
@@ -1560,9 +1560,9 @@ mod tests {
         on: JoinOn,
         join_type: JoinType,
     ) -> Result<(Vec<String>, Vec<RecordBatch>)> {
-        let session_ctx =
-            
SessionContext::with_config(SessionConfig::new().with_batch_size(2));
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = TaskContext::default()
+            .with_session_config(SessionConfig::new().with_batch_size(2));
+        let task_ctx = Arc::new(task_ctx);
         let join = join(left, right, on, join_type)?;
         let columns = columns(&join.schema());
 
@@ -2321,8 +2321,12 @@ mod tests {
             let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
             let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
             let session_config = SessionConfig::default().with_batch_size(50);
-            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
-            let task_ctx = session_ctx.task_ctx();
+
+            let task_ctx = TaskContext::default()
+                .with_session_config(session_config)
+                .with_runtime(runtime);
+            let task_ctx = Arc::new(task_ctx);
+
             let join = join_with_options(
                 left.clone(),
                 right.clone(),
@@ -2397,8 +2401,10 @@ mod tests {
             let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
             let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
             let session_config = SessionConfig::default().with_batch_size(50);
-            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
-            let task_ctx = session_ctx.task_ctx();
+            let task_ctx = TaskContext::default()
+                .with_session_config(session_config)
+                .with_runtime(runtime);
+            let task_ctx = Arc::new(task_ctx);
             let join = join_with_options(
                 left.clone(),
                 right.clone(),
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index dc8bcc2edb..efe7ce503b 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -1428,21 +1428,17 @@ impl SymmetricHashJoinStream {
 
 #[cfg(test)]
 mod tests {
-    use std::fs::File;
-
+    use super::*;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
+    use datafusion_execution::config::SessionConfig;
     use rstest::*;
-    use tempfile::TempDir;
 
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{binary, col, Column};
     use 
datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numerical_expr;
 
-    use crate::physical_plan::displayable;
     use 
crate::physical_plan::joins::hash_join_utils::tests::complicated_filter;
-    use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
-    use crate::test_util::register_unbounded_file_with_ordering;
 
     use crate::physical_plan::joins::test_utils::{
         build_sides_record_batches, compare_batches, create_memory_table,
@@ -1451,9 +1447,6 @@ mod tests {
         partitioned_sym_join_with_filter,
     };
     use datafusion_common::ScalarValue;
-    use std::iter::Iterator;
-
-    use super::*;
 
     const TABLE_SIZE: i32 = 100;
 
@@ -1506,8 +1499,7 @@ mod tests {
         cardinality: (i32, i32),
     ) -> Result<()> {
         // a + b > c + 10 AND a + b < c + 100
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -1587,8 +1579,7 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize,
     ) -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -1662,8 +1653,7 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize,
     ) -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -1715,8 +1705,7 @@ mod tests {
         )]
         join_type: JoinType,
     ) -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, 
(11, 21))?;
         let left_schema = &left_batch.schema();
         let right_schema = &right_batch.schema();
@@ -1753,8 +1742,7 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4, 5, 6)] case_expr: usize,
     ) -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -1811,173 +1799,14 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn join_change_in_planner() -> Result<()> {
-        let config = SessionConfig::new().with_target_partitions(8);
-        let ctx = SessionContext::with_config(config);
-        let tmp_dir = TempDir::new().unwrap();
-        let left_file_path = tmp_dir.path().join("left.csv");
-        File::create(left_file_path.clone()).unwrap();
-        // Create schema
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a1", DataType::UInt32, false),
-            Field::new("a2", DataType::UInt32, false),
-        ]));
-        // Specify the ordering:
-        let file_sort_order = vec![[datafusion_expr::col("a1")]
-            .into_iter()
-            .map(|e| {
-                let ascending = true;
-                let nulls_first = false;
-                e.sort(ascending, nulls_first)
-            })
-            .collect::<Vec<_>>()];
-        register_unbounded_file_with_ordering(
-            &ctx,
-            schema.clone(),
-            &left_file_path,
-            "left",
-            file_sort_order.clone(),
-            true,
-        )
-        .await?;
-        let right_file_path = tmp_dir.path().join("right.csv");
-        File::create(right_file_path.clone()).unwrap();
-        register_unbounded_file_with_ordering(
-            &ctx,
-            schema,
-            &right_file_path,
-            "right",
-            file_sort_order,
-            true,
-        )
-        .await?;
-        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
-        let dataframe = ctx.sql(sql).await?;
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
-        let expected = {
-            [
-                "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
-                "  CoalesceBatchesExec: target_batch_size=8192",
-                "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
-                // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",
-                "  CoalesceBatchesExec: target_batch_size=8192",
-                "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
-                // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, 
projection=[a1, a2], has_header=false"
-            ]
-        };
-        let mut actual: Vec<&str> = formatted.trim().lines().collect();
-        // Remove CSV lines
-        actual.remove(3);
-        actual.remove(5);
-
-        assert_eq!(
-            expected,
-            actual[..],
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn join_change_in_planner_without_sort() -> Result<()> {
-        let config = SessionConfig::new().with_target_partitions(8);
-        let ctx = SessionContext::with_config(config);
-        let tmp_dir = TempDir::new()?;
-        let left_file_path = tmp_dir.path().join("left.csv");
-        File::create(left_file_path.clone())?;
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a1", DataType::UInt32, false),
-            Field::new("a2", DataType::UInt32, false),
-        ]));
-        ctx.register_csv(
-            "left",
-            left_file_path.as_os_str().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).mark_infinite(true),
-        )
-        .await?;
-        let right_file_path = tmp_dir.path().join("right.csv");
-        File::create(right_file_path.clone())?;
-        ctx.register_csv(
-            "right",
-            right_file_path.as_os_str().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).mark_infinite(true),
-        )
-        .await?;
-        let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
-        let dataframe = ctx.sql(sql).await?;
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
-        let expected = {
-            [
-                "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
-                "  CoalesceBatchesExec: target_batch_size=8192",
-                "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
-                // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",
-                "  CoalesceBatchesExec: target_batch_size=8192",
-                "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
-                // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, 
projection=[a1, a2], has_header=false"
-            ]
-        };
-        let mut actual: Vec<&str> = formatted.trim().lines().collect();
-        // Remove CSV lines
-        actual.remove(3);
-        actual.remove(5);
-
-        assert_eq!(
-            expected,
-            actual[..],
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
-        let config = SessionConfig::new()
-            .with_target_partitions(8)
-            .with_allow_symmetric_joins_without_pruning(false);
-        let ctx = SessionContext::with_config(config);
-        let tmp_dir = TempDir::new()?;
-        let left_file_path = tmp_dir.path().join("left.csv");
-        File::create(left_file_path.clone())?;
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a1", DataType::UInt32, false),
-            Field::new("a2", DataType::UInt32, false),
-        ]));
-        ctx.register_csv(
-            "left",
-            left_file_path.as_os_str().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).mark_infinite(true),
-        )
-        .await?;
-        let right_file_path = tmp_dir.path().join("right.csv");
-        File::create(right_file_path.clone())?;
-        ctx.register_csv(
-            "right",
-            right_file_path.as_os_str().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).mark_infinite(true),
-        )
-        .await?;
-        let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 
FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 
+ 10").await?;
-        match df.create_physical_plan().await {
-            Ok(_) => panic!("Expecting error."),
-            Err(e) => {
-                assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError 
during planning: Join operation cannot operate on a non-prunable stream without 
enabling the 'allow_symmetric_joins_without_pruning' configuration flag")
-            }
-        }
-        Ok(())
-    }
-
     #[tokio::test(flavor = "multi_thread")]
     async fn build_null_columns_first() -> Result<()> {
         let join_type = JoinType::Full;
         let cardinality = (10, 11);
         let case_expr = 1;
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2038,9 +1867,9 @@ mod tests {
         let join_type = JoinType::Full;
         let cardinality = (10, 11);
         let case_expr = 1;
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2102,9 +1931,9 @@ mod tests {
         let join_type = JoinType::Full;
         let cardinality = (10, 11);
         let case_expr = 1;
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2167,9 +1996,9 @@ mod tests {
         let join_type = JoinType::Full;
 
         // a + b > c + 10 AND a + b < c + 100
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2241,9 +2070,9 @@ mod tests {
         let join_type = case.0;
         let should_be_empty = case.1;
         let random_state = RandomState::with_seeds(0, 0, 0, 0);
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         // Ensure there will be matching rows
         let (left_batch, right_batch) = build_sides_record_batches(20, (1, 
1))?;
         let left_schema = left_batch.schema();
@@ -2369,9 +2198,9 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1)] case_expr: usize,
     ) -> Result<()> {
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2453,9 +2282,9 @@ mod tests {
         )]
         cardinality: (i32, i32),
     ) -> Result<()> {
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
@@ -2532,9 +2361,9 @@ mod tests {
         cardinality: (i32, i32),
         #[values(0, 1, 2, 3, 4, 5, 6, 7)] case_expr: usize,
     ) -> Result<()> {
-        let config = SessionConfig::new().with_repartition_joins(false);
-        let session_ctx = SessionContext::with_config(config);
-        let task_ctx = session_ctx.task_ctx();
+        let session_config = 
SessionConfig::new().with_repartition_joins(false);
+        let task_ctx = 
TaskContext::default().with_session_config(session_config);
+        let task_ctx = Arc::new(task_ctx);
         let (left_batch, right_batch) =
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
diff --git a/datafusion/core/src/physical_plan/limit.rs 
b/datafusion/core/src/physical_plan/limit.rs
index 93f6cd7c2c..c5d5fbcfd1 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -532,13 +532,11 @@ mod tests {
     use super::*;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::common;
-    use crate::prelude::SessionContext;
     use crate::test;
 
     #[tokio::test]
     async fn limit() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
         let csv = test::scan_partitioned_csv(num_partitions)?;
@@ -654,8 +652,7 @@ mod tests {
 
     // test cases for "skip"
     async fn skip_and_fetch(skip: usize, fetch: Option<usize>) -> 
Result<usize> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let num_partitions = 4;
         let csv = test::scan_partitioned_csv(num_partitions)?;
diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index 5c4b661143..ca4a3e54f5 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -427,7 +427,6 @@ mod tests {
     use super::*;
     use crate::physical_plan::common::collect;
     use crate::physical_plan::expressions::{self, col};
-    use crate::prelude::SessionContext;
     use crate::test::{self};
     use crate::test_util;
     use datafusion_common::ScalarValue;
@@ -448,8 +447,7 @@ mod tests {
 
     #[tokio::test]
     async fn project_first_column() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
@@ -519,8 +517,7 @@ mod tests {
 
     #[tokio::test]
     async fn project_no_column() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let csv = test::scan_partitioned_csv(1)?;
         let expected = collect(csv.execute(0, 
task_ctx.clone())?).await.unwrap();
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 3f83e186ea..78d78df0af 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -883,8 +883,6 @@ impl RecordBatchStream for PerPartitionStream {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::execution::context::SessionConfig;
-    use crate::prelude::SessionContext;
     use crate::test::create_vec_batches;
     use crate::{
         assert_batches_sorted_eq,
@@ -998,8 +996,7 @@ mod tests {
         input_partitions: Vec<Vec<RecordBatch>>,
         partitioning: Partitioning,
     ) -> Result<Vec<Vec<RecordBatch>>> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         // create physical plan
         let exec = MemoryExec::try_new(&input_partitions, schema.clone(), 
None)?;
         let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;
@@ -1046,8 +1043,7 @@ mod tests {
 
     #[tokio::test]
     async fn unsupported_partitioning() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         // have to send at least one batch through to provoke error
         let batch = RecordBatch::try_from_iter(vec![(
             "my_awesome_field",
@@ -1081,8 +1077,7 @@ mod tests {
         // This generates an error on a call to execute. The error
         // should be returned and no results produced.
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let input = ErrorExec::new();
         let partitioning = Partitioning::RoundRobinBatch(1);
         let exec = RepartitionExec::try_new(Arc::new(input), 
partitioning).unwrap();
@@ -1104,8 +1099,7 @@ mod tests {
 
     #[tokio::test]
     async fn repartition_with_error_in_stream() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let batch = RecordBatch::try_from_iter(vec![(
             "my_awesome_field",
             Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
@@ -1138,8 +1132,7 @@ mod tests {
 
     #[tokio::test]
     async fn repartition_with_delayed_stream() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let batch1 = RecordBatch::try_from_iter(vec![(
             "my_awesome_field",
             Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
@@ -1184,8 +1177,7 @@ mod tests {
 
     #[tokio::test]
     async fn robin_repartition_with_dropping_output_stream() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let partitioning = Partitioning::RoundRobinBatch(2);
         // The barrier exec waits to be pinged
         // requires the input to wait at least once)
@@ -1228,8 +1220,7 @@ mod tests {
     // wiht different compilers, we will compare the same execution with
     // and without droping the output stream.
     async fn hash_repartition_with_dropping_output_stream() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let partitioning = Partitioning::Hash(
             vec![Arc::new(crate::physical_plan::expressions::Column::new(
                 "my_awesome_field",
@@ -1324,8 +1315,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
@@ -1348,8 +1338,7 @@ mod tests {
 
     #[tokio::test]
     async fn hash_repartition_avoid_empty_batch() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let batch = RecordBatch::try_from_iter(vec![(
             "a",
             Arc::new(StringArray::from(vec!["foo"])) as ArrayRef,
@@ -1385,14 +1374,12 @@ mod tests {
         let partitioning = Partitioning::RoundRobinBatch(4);
 
         // setup up context
-        let session_ctx = SessionContext::with_config_rt(
-            SessionConfig::default(),
-            Arc::new(
-                RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 
1.0))
-                    .unwrap(),
-            ),
+        let runtime = Arc::new(
+            RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 
1.0)).unwrap(),
         );
-        let task_ctx = session_ctx.task_ctx();
+
+        let task_ctx = TaskContext::default().with_runtime(runtime);
+        let task_ctx = Arc::new(task_ctx);
 
         // create physical plan
         let exec = MemoryExec::try_new(&input_partitions, schema.clone(), 
None)?;
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 1813fd6a15..3bedbd17e8 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -884,12 +884,10 @@ impl ExecutionPlan for SortExec {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::execution::context::SessionConfig;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::collect;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::memory::MemoryExec;
-    use crate::prelude::SessionContext;
     use crate::test;
     use crate::test::assert_is_pending;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
@@ -897,14 +895,14 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::*;
     use datafusion_common::cast::{as_primitive_array, as_string_array};
+    use datafusion_execution::config::SessionConfig;
     use datafusion_execution::runtime_env::RuntimeConfig;
     use futures::FutureExt;
     use std::collections::HashMap;
 
     #[tokio::test]
     async fn test_in_mem_sort() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let partitions = 4;
         let csv = test::scan_partitioned_csv(partitions)?;
         let schema = csv.schema();
@@ -930,7 +928,7 @@ mod tests {
             Arc::new(CoalescePartitionsExec::new(csv)),
         ));
 
-        let result = collect(sort_exec, task_ctx).await?;
+        let result = collect(sort_exec, task_ctx.clone()).await?;
 
         assert_eq!(result.len(), 1);
 
@@ -949,7 +947,7 @@ mod tests {
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
         assert_eq!(
-            session_ctx.runtime_env().memory_pool.reserved(),
+            task_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory 
manager"
         );
@@ -968,7 +966,11 @@ mod tests {
         let rt_config = RuntimeConfig::new()
             .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0);
         let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
-        let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
+        let task_ctx = Arc::new(
+            TaskContext::default()
+                .with_session_config(session_config)
+                .with_runtime(runtime),
+        );
 
         let partitions = 4;
         let csv = test::scan_partitioned_csv(partitions)?;
@@ -995,8 +997,7 @@ mod tests {
             Arc::new(CoalescePartitionsExec::new(csv)),
         ));
 
-        let task_ctx = session_ctx.task_ctx();
-        let result = collect(sort_exec.clone(), task_ctx).await?;
+        let result = collect(sort_exec.clone(), task_ctx.clone()).await?;
 
         assert_eq!(result.len(), 1);
 
@@ -1023,7 +1024,7 @@ mod tests {
         assert_eq!(c7.value(c7.len() - 1), 254,);
 
         assert_eq!(
-            session_ctx.runtime_env().memory_pool.reserved(),
+            task_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory 
manager"
         );
@@ -1059,7 +1060,11 @@ mod tests {
                 1.0,
             );
             let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
-            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
+            let task_ctx = Arc::new(
+                TaskContext::default()
+                    .with_runtime(runtime)
+                    .with_session_config(session_config),
+            );
 
             let csv = test::scan_partitioned_csv(partitions)?;
             let schema = csv.schema();
@@ -1088,7 +1093,6 @@ mod tests {
                 .with_fetch(fetch),
             );
 
-            let task_ctx = session_ctx.task_ctx();
             let result = collect(sort_exec.clone(), task_ctx).await?;
             assert_eq!(result.len(), 1);
 
@@ -1101,8 +1105,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_sort_metadata() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let field_metadata: HashMap<String, String> =
             vec![("foo".to_string(), "bar".to_string())]
                 .into_iter()
@@ -1151,8 +1154,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_lex_sort_by_float() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Float32, true),
             Field::new("b", DataType::Float64, true),
@@ -1257,8 +1259,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
@@ -1272,7 +1273,7 @@ mod tests {
             blocking_exec,
         ));
 
-        let fut = collect(sort_exec, task_ctx);
+        let fut = collect(sort_exec, task_ctx.clone());
         let mut fut = fut.boxed();
 
         assert_is_pending(&mut fut);
@@ -1280,7 +1281,7 @@ mod tests {
         assert_strong_count_converges_to_zero(refs).await;
 
         assert_eq!(
-            session_ctx.runtime_env().memory_pool.reserved(),
+            task_ctx.runtime_env().memory_pool.reserved(),
             0,
             "The sort should have returned all memory used back to the memory 
manager"
         );
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 6b978b5ee7..bc0ac678f0 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -273,6 +273,7 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
+    use datafusion_execution::config::SessionConfig;
     use futures::{FutureExt, StreamExt};
 
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -282,7 +283,6 @@ mod tests {
     use crate::physical_plan::sorts::sort::SortExec;
     use crate::physical_plan::stream::RecordBatchReceiverStream;
     use crate::physical_plan::{collect, common};
-    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
@@ -292,8 +292,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_merge_interleave() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
             Some("a"),
@@ -341,8 +340,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_merge_some_overlap() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
             Some("a"),
@@ -390,8 +388,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_merge_no_overlap() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
             Some("a"),
@@ -439,8 +436,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_merge_three_partitions() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
             Some("a"),
@@ -561,8 +557,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let partitions = 4;
         let csv = test::scan_partitioned_csv(partitions).unwrap();
         let schema = csv.schema();
@@ -644,8 +639,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort_streaming_input() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let sort = vec![
             // uint8
@@ -705,17 +699,18 @@ mod tests {
             },
         ];
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        // Test streaming with default batch size
+        let task_ctx = Arc::new(TaskContext::default());
         let input =
             sorted_partitioned_input(sort.clone(), &[10, 5, 13], 
task_ctx.clone()).await;
         let basic = basic_sort(input.clone(), sort.clone(), task_ctx).await;
 
-        let session_ctx_bs_23 =
-            
SessionContext::with_config(SessionConfig::new().with_batch_size(23));
+        // batch size of 23
+        let task_ctx = TaskContext::default()
+            .with_session_config(SessionConfig::new().with_batch_size(23));
+        let task_ctx = Arc::new(task_ctx);
 
         let merge = Arc::new(SortPreservingMergeExec::new(sort, input));
-        let task_ctx = session_ctx_bs_23.task_ctx();
         let merged = collect(merge, task_ctx).await.unwrap();
 
         assert_eq!(merged.len(), 14);
@@ -735,8 +730,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_nulls() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
             None,
@@ -817,8 +811,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_async() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = test_util::aggr_test_schema();
         let sort = vec![PhysicalSortExpr {
             expr: col("c12", &schema).unwrap(),
@@ -885,8 +878,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_merge_metrics() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
         let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"), 
Some("c")]));
         let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
@@ -942,8 +934,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
@@ -969,8 +960,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_stable_sort() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         // Create record batches like:
         // batch_number |value
diff --git a/datafusion/core/src/physical_plan/stream.rs 
b/datafusion/core/src/physical_plan/stream.rs
index 2b916b7ee2..2e9d40cfe5 100644
--- a/datafusion/core/src/physical_plan/stream.rs
+++ b/datafusion/core/src/physical_plan/stream.rs
@@ -370,11 +370,8 @@ mod test {
     use super::*;
     use arrow_schema::{DataType, Field, Schema};
 
-    use crate::{
-        execution::context::SessionContext,
-        test::exec::{
-            assert_strong_count_converges_to_zero, BlockingExec, MockExec, 
PanicExec,
-        },
+    use crate::test::exec::{
+        assert_strong_count_converges_to_zero, BlockingExec, MockExec, 
PanicExec,
     };
 
     fn schema() -> SchemaRef {
@@ -413,8 +410,7 @@ mod test {
 
     #[tokio::test]
     async fn record_batch_receiver_stream_drop_cancel() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = schema();
 
         // Make an input that never proceeds
@@ -439,8 +435,7 @@ mod test {
     /// `RecordBatchReceiverStream` stops early and does not drive
     /// other streams to completion.
     async fn record_batch_receiver_stream_error_does_not_drive_completion() {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema = schema();
 
         // make an input that will error twice
@@ -471,8 +466,7 @@ mod test {
     ///
     /// panic's if more than max_batches is seen,
     async fn consume(input: PanicExec, max_batches: usize) {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         let input = Arc::new(input);
         let num_partitions = input.output_partitioning().partition_count();
diff --git a/datafusion/core/src/physical_plan/union.rs 
b/datafusion/core/src/physical_plan/union.rs
index e1f8072085..e29c96da09 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -608,14 +608,12 @@ mod tests {
     use super::*;
     use crate::test;
 
-    use crate::prelude::SessionContext;
     use crate::{physical_plan::collect, scalar::ScalarValue};
     use arrow::record_batch::RecordBatch;
 
     #[tokio::test]
     async fn test_union_partitions() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
 
         // Create csv's with different partitioning
         let csv = test::scan_partitioned_csv(4)?;
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index 7d9d70f477..cba7df772e 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -362,7 +362,6 @@ mod tests {
     use crate::physical_plan::aggregates::AggregateFunction;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::{collect, ExecutionPlan};
-    use crate::prelude::SessionContext;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending, csv_exec_sorted};
     use arrow::array::*;
@@ -370,6 +369,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use datafusion_common::cast::as_primitive_array;
+    use datafusion_execution::TaskContext;
     use datafusion_expr::{create_udaf, Accumulator, Volatility};
     use futures::FutureExt;
 
@@ -546,8 +546,7 @@ mod tests {
             Arc::new(vec![DataType::Int64]),
         );
 
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (input, schema) = create_test_schema(1)?;
 
         let window_exec = Arc::new(WindowAggExec::try_new(
@@ -579,8 +578,7 @@ mod tests {
 
     #[tokio::test]
     async fn window_function() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let (input, schema) = create_test_schema(1)?;
 
         let window_exec = Arc::new(WindowAggExec::try_new(
@@ -643,8 +641,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_drop_cancel() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
+        let task_ctx = Arc::new(TaskContext::default());
         let schema =
             Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, 
true)]));
 
diff --git a/datafusion/core/tests/sql/joins.rs 
b/datafusion/core/tests/sql/joins.rs
index 116a3c1a79..e555a28f41 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::test_util::register_unbounded_file_with_ordering;
+
 use super::*;
 
 #[tokio::test]
@@ -75,3 +77,162 @@ async fn null_aware_left_anti_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn join_change_in_planner() -> Result<()> {
+    let config = SessionConfig::new().with_target_partitions(8);
+    let ctx = SessionContext::with_config(config);
+    let tmp_dir = TempDir::new().unwrap();
+    let left_file_path = tmp_dir.path().join("left.csv");
+    File::create(left_file_path.clone()).unwrap();
+    // Create schema
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a1", DataType::UInt32, false),
+        Field::new("a2", DataType::UInt32, false),
+    ]));
+    // Specify the ordering:
+    let file_sort_order = vec![[datafusion_expr::col("a1")]
+        .into_iter()
+        .map(|e| {
+            let ascending = true;
+            let nulls_first = false;
+            e.sort(ascending, nulls_first)
+        })
+        .collect::<Vec<_>>()];
+    register_unbounded_file_with_ordering(
+        &ctx,
+        schema.clone(),
+        &left_file_path,
+        "left",
+        file_sort_order.clone(),
+        true,
+    )
+    .await?;
+    let right_file_path = tmp_dir.path().join("right.csv");
+    File::create(right_file_path.clone()).unwrap();
+    register_unbounded_file_with_ordering(
+        &ctx,
+        schema,
+        &right_file_path,
+        "right",
+        file_sort_order,
+        true,
+    )
+    .await?;
+    let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+    let dataframe = ctx.sql(sql).await?;
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
+    let expected = {
+        [
+            "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
+            // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
+            // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, 
projection=[a1, a2], has_header=false"
+        ]
+    };
+    let mut actual: Vec<&str> = formatted.trim().lines().collect();
+    // Remove CSV lines
+    actual.remove(3);
+    actual.remove(5);
+
+    assert_eq!(
+        expected,
+        actual[..],
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_change_in_planner_without_sort() -> Result<()> {
+    let config = SessionConfig::new().with_target_partitions(8);
+    let ctx = SessionContext::with_config(config);
+    let tmp_dir = TempDir::new()?;
+    let left_file_path = tmp_dir.path().join("left.csv");
+    File::create(left_file_path.clone())?;
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a1", DataType::UInt32, false),
+        Field::new("a2", DataType::UInt32, false),
+    ]));
+    ctx.register_csv(
+        "left",
+        left_file_path.as_os_str().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).mark_infinite(true),
+    )
+    .await?;
+    let right_file_path = tmp_dir.path().join("right.csv");
+    File::create(right_file_path.clone())?;
+    ctx.register_csv(
+        "right",
+        right_file_path.as_os_str().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).mark_infinite(true),
+    )
+    .await?;
+    let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN 
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+    let dataframe = ctx.sql(sql).await?;
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
+    let expected = {
+        [
+            "SymmetricHashJoinExec: mode=Partitioned, join_type=Full, 
on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND 
CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
+            // "   CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, 
projection=[a1, a2], has_header=false",
+            "  CoalesceBatchesExec: target_batch_size=8192",
+            "    RepartitionExec: partitioning=Hash([a2@1], 8), 
input_partitions=1",
+            // "   CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, 
projection=[a1, a2], has_header=false"
+        ]
+    };
+    let mut actual: Vec<&str> = formatted.trim().lines().collect();
+    // Remove CSV lines
+    actual.remove(3);
+    actual.remove(5);
+
+    assert_eq!(
+        expected,
+        actual[..],
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
+    let config = SessionConfig::new()
+        .with_target_partitions(8)
+        .with_allow_symmetric_joins_without_pruning(false);
+    let ctx = SessionContext::with_config(config);
+    let tmp_dir = TempDir::new()?;
+    let left_file_path = tmp_dir.path().join("left.csv");
+    File::create(left_file_path.clone())?;
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("a1", DataType::UInt32, false),
+        Field::new("a2", DataType::UInt32, false),
+    ]));
+    ctx.register_csv(
+        "left",
+        left_file_path.as_os_str().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).mark_infinite(true),
+    )
+    .await?;
+    let right_file_path = tmp_dir.path().join("right.csv");
+    File::create(right_file_path.clone())?;
+    ctx.register_csv(
+        "right",
+        right_file_path.as_os_str().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).mark_infinite(true),
+    )
+    .await?;
+    let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL 
JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 
10").await?;
+    match df.create_physical_plan().await {
+        Ok(_) => panic!("Expecting error."),
+        Err(e) => {
+            assert_eq!(e.to_string(), "PipelineChecker\ncaused by\nError 
during planning: Join operation cannot operate on a non-prunable stream without 
enabling the 'allow_symmetric_joins_without_pruning' configuration flag")
+        }
+    }
+    Ok(())
+}
diff --git a/datafusion/core/tests/sqllogictests/test_files/options.slt 
b/datafusion/core/tests/sqllogictests/test_files/options.slt
new file mode 100644
index 0000000000..1f4cc9ab0c
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/options.slt
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######
+## Tests for config options
+#######
+
+
+statement ok
+create table a(c0 int) as values (1), (2);
+
+# Expect coalesce and default batch size
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: c0@0 < 1
+----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+##
+# test_disable_coalesce
+##
+
+statement ok
+set datafusion.execution.coalesce_batches = false
+
+# expect no coalsece
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+FilterExec: c0@0 < 1
+--MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+statement ok
+set datafusion.execution.coalesce_batches = true
+
+
+##
+# test_custom_batch_size
+##
+
+statement ok
+set datafusion.execution.batch_size = 1234;
+
+# expect batch size to be 1234
+query TT
+explain SELECT * FROM a WHERE c0 < 1;
+----
+logical_plan
+Filter: a.c0 < Int32(1)
+--TableScan: a projection=[c0]
+physical_plan
+CoalesceBatchesExec: target_batch_size=1234
+--FilterExec: c0@0 < 1
+----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+
+statement ok
+set datafusion.execution.batch_size = 8192;
+
+statement ok
+drop table a
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 62e2fa8da3..72d804d7bb 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -27,16 +27,20 @@ use datafusion_common::{
 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 
 use crate::{
-    config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
-    runtime_env::RuntimeEnv,
+    config::SessionConfig,
+    memory_pool::MemoryPool,
+    registry::FunctionRegistry,
+    runtime_env::{RuntimeConfig, RuntimeEnv},
 };
 
 /// Task Execution Context
 ///
-/// A [`TaskContext`] has represents the state available during a single 
query's
-/// execution.
+/// A [`TaskContext`] contains the state available during a single
+/// query's execution. Please see [`SessionContext`] for a user level
+/// multi-query API.
 ///
-/// # Task Context
+/// [`SessionContext`]: 
https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
+#[derive(Debug)]
 pub struct TaskContext {
     /// Session Id
     session_id: String,
@@ -54,6 +58,24 @@ pub struct TaskContext {
     runtime: Arc<RuntimeEnv>,
 }
 
+impl Default for TaskContext {
+    fn default() -> Self {
+        let runtime = RuntimeEnv::new(RuntimeConfig::new())
+            .expect("defauly runtime created successfully");
+
+        // Create a default task context, mostly useful for testing
+        Self {
+            session_id: "DEFAULT".to_string(),
+            task_id: None,
+            session_config: SessionConfig::new(),
+            scalar_functions: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            window_functions: HashMap::new(),
+            runtime: Arc::new(runtime),
+        }
+    }
+}
+
 impl TaskContext {
     /// Create a new [`TaskContext`] instance.
     ///
@@ -137,6 +159,18 @@ impl TaskContext {
     pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
         self.runtime.clone()
     }
+
+    /// Update the [`ConfigOptions`]
+    pub fn with_session_config(mut self, session_config: SessionConfig) -> 
Self {
+        self.session_config = session_config;
+        self
+    }
+
+    /// Update the [`RuntimeEnv`]
+    pub fn with_runtime(mut self, runtime: Arc<RuntimeEnv>) -> Self {
+        self.runtime = runtime;
+        self
+    }
 }
 
 impl FunctionRegistry for TaskContext {


Reply via email to