This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 682da846b5 feat: Push limit into hash join (#20228)
682da846b5 is described below
commit 682da846b5998f0607b80d867411c92f8506735c
Author: Jonathan Chen <[email protected]>
AuthorDate: Fri Feb 13 00:20:25 2026 -0600
feat: Push limit into hash join (#20228)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Part of #18295.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
Push limit down into hash join using limit pushdown optimizer. Use limit
pushdown optimizer to pass the limit value to Hash Join exec using
`with_fetch` and passing the `fetch` value to `LimitedBatch Coalescer`
to emit the batch once the limit is hit.
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
SLT tests + unit tests
---------
Co-authored-by: Yongting You <[email protected]>
---
.../tests/physical_optimizer/limit_pushdown.rs | 167 ++++++++++++-
.../physical-plan/src/joins/hash_join/exec.rs | 108 +++++++--
.../physical-plan/src/joins/hash_join/stream.rs | 43 ++--
.../test_files/join_disable_repartition_joins.slt | 4 +-
.../test_files/join_limit_pushdown.slt | 269 +++++++++++++++++++++
datafusion/sqllogictest/test_files/joins.slt | 38 ++-
6 files changed, 563 insertions(+), 66 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
index 0c41fc8e9e..b8c4d6d6f0 100644
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
@@ -18,8 +18,8 @@
use std::sync::Arc;
use crate::physical_optimizer::test_utils::{
- coalesce_partitions_exec, global_limit_exec, local_limit_exec, sort_exec,
- sort_preserving_merge_exec, stream_exec,
+ coalesce_partitions_exec, global_limit_exec, hash_join_exec,
local_limit_exec,
+ sort_exec, sort_preserving_merge_exec, stream_exec,
};
use arrow::compute::SortOptions;
@@ -29,6 +29,7 @@ use datafusion_common::error::Result;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
@@ -161,6 +162,168 @@ fn
transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
Ok(())
}
+fn join_on_columns(
+ left_col: &str,
+ right_col: &str,
+) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
+ vec![(
+ Arc::new(datafusion_physical_expr::expressions::Column::new(
+ left_col, 0,
+ )) as _,
+ Arc::new(datafusion_physical_expr::expressions::Column::new(
+ right_col, 0,
+ )) as _,
+ )]
+}
+
+#[test]
+fn absorbs_limit_into_hash_join_inner() -> Result<()> {
+ // HashJoinExec with Inner join should absorb limit via with_fetch
+ let schema = create_schema();
+ let left = empty_exec(Arc::clone(&schema));
+ let right = empty_exec(Arc::clone(&schema));
+ let on = join_on_columns("c1", "c1");
+ let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
+ let global_limit = global_limit_exec(hash_join, 0, Some(5));
+
+ let initial = format_plan(&global_limit);
+ insta::assert_snapshot!(
+ initial,
+ @r"
+ GlobalLimitExec: skip=0, fetch=5
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ let after_optimize =
+ LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ let optimized = format_plan(&after_optimize);
+ // The limit should be absorbed by the hash join (not pushed to children)
+ insta::assert_snapshot!(
+ optimized,
+ @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn absorbs_limit_into_hash_join_right() -> Result<()> {
+ // HashJoinExec with Right join should absorb limit via with_fetch
+ let schema = create_schema();
+ let left = empty_exec(Arc::clone(&schema));
+ let right = empty_exec(Arc::clone(&schema));
+ let on = join_on_columns("c1", "c1");
+ let hash_join = hash_join_exec(left, right, on, None, &JoinType::Right)?;
+ let global_limit = global_limit_exec(hash_join, 0, Some(10));
+
+ let initial = format_plan(&global_limit);
+ insta::assert_snapshot!(
+ initial,
+ @r"
+ GlobalLimitExec: skip=0, fetch=10
+ HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)]
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ let after_optimize =
+ LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ let optimized = format_plan(&after_optimize);
+ // The limit should be absorbed by the hash join
+ insta::assert_snapshot!(
+ optimized,
+ @r"
+ HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)],
fetch=10
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn absorbs_limit_into_hash_join_left() -> Result<()> {
+ // during probing, then unmatched rows at the end, stopping when limit is
reached
+ let schema = create_schema();
+ let left = empty_exec(Arc::clone(&schema));
+ let right = empty_exec(Arc::clone(&schema));
+ let on = join_on_columns("c1", "c1");
+ let hash_join = hash_join_exec(left, right, on, None, &JoinType::Left)?;
+ let global_limit = global_limit_exec(hash_join, 0, Some(5));
+
+ let initial = format_plan(&global_limit);
+ insta::assert_snapshot!(
+ initial,
+ @r"
+ GlobalLimitExec: skip=0, fetch=5
+ HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)]
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ let after_optimize =
+ LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ let optimized = format_plan(&after_optimize);
+ // Left join now absorbs the limit
+ insta::assert_snapshot!(
+ optimized,
+ @r"
+ HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn absorbs_limit_with_skip_into_hash_join() -> Result<()> {
+ let schema = create_schema();
+ let left = empty_exec(Arc::clone(&schema));
+ let right = empty_exec(Arc::clone(&schema));
+ let on = join_on_columns("c1", "c1");
+ let hash_join = hash_join_exec(left, right, on, None, &JoinType::Inner)?;
+ let global_limit = global_limit_exec(hash_join, 3, Some(5));
+
+ let initial = format_plan(&global_limit);
+ insta::assert_snapshot!(
+ initial,
+ @r"
+ GlobalLimitExec: skip=3, fetch=5
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ let after_optimize =
+ LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
+ let optimized = format_plan(&after_optimize);
+ // With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed
by the join
+ insta::assert_snapshot!(
+ optimized,
+ @r"
+ GlobalLimitExec: skip=3, fetch=5
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)],
fetch=8
+ EmptyExec
+ EmptyExec
+ "
+ );
+
+ Ok(())
+}
+
#[test]
fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {
let schema = create_schema();
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 77d736e938..eb2e841791 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -258,6 +258,11 @@ pub struct HashJoinExecBuilder {
partition_mode: PartitionMode,
null_equality: NullEquality,
null_aware: bool,
+ /// Maximum number of rows to return
+ ///
+ /// If the operator produces `< fetch` rows, it returns all available rows.
+ /// If it produces `>= fetch` rows, it returns exactly `fetch` rows and
stops early.
+ fetch: Option<usize>,
}
impl HashJoinExecBuilder {
@@ -278,6 +283,7 @@ impl HashJoinExecBuilder {
join_type,
null_equality: NullEquality::NullEqualsNothing,
null_aware: false,
+ fetch: None,
}
}
@@ -316,6 +322,12 @@ impl HashJoinExecBuilder {
self
}
+ /// Set fetch limit.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
/// Build resulting execution plan.
pub fn build(self) -> Result<HashJoinExec> {
let Self {
@@ -328,6 +340,7 @@ impl HashJoinExecBuilder {
partition_mode,
null_equality,
null_aware,
+ fetch,
} = self;
let left_schema = left.schema();
@@ -393,6 +406,7 @@ impl HashJoinExecBuilder {
null_aware,
cache,
dynamic_filter: None,
+ fetch,
})
}
}
@@ -409,6 +423,7 @@ impl From<&HashJoinExec> for HashJoinExecBuilder {
partition_mode: exec.mode,
null_equality: exec.null_equality,
null_aware: exec.null_aware,
+ fetch: exec.fetch,
}
}
}
@@ -646,6 +661,8 @@ pub struct HashJoinExec {
/// Set when dynamic filter pushdown is detected in
handle_child_pushdown_result.
/// HashJoinExec also needs to keep a shared bounds accumulator for
coordinating updates.
dynamic_filter: Option<HashJoinExecDynamicFilter>,
+ /// Maximum number of rows to return
+ fetch: Option<usize>,
}
#[derive(Clone)]
@@ -930,25 +947,27 @@ impl HashJoinExec {
) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
- let new_join = HashJoinExec::try_new(
+ let new_join = HashJoinExecBuilder::new(
Arc::clone(right),
Arc::clone(left),
self.on()
.iter()
.map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
.collect(),
- self.filter().map(JoinFilter::swap),
- &self.join_type().swap(),
- swap_join_projection(
- left.schema().fields().len(),
- right.schema().fields().len(),
- self.projection.as_deref(),
- self.join_type(),
- ),
- partition_mode,
- self.null_equality(),
- self.null_aware,
- )?;
+ self.join_type().swap(),
+ )
+ .with_filter(self.filter().map(JoinFilter::swap))
+ .with_projection(swap_join_projection(
+ left.schema().fields().len(),
+ right.schema().fields().len(),
+ self.projection.as_deref(),
+ self.join_type(),
+ ))
+ .with_partition_mode(partition_mode)
+ .with_null_equality(self.null_equality())
+ .with_null_aware(self.null_aware)
+ .with_fetch(self.fetch)
+ .build()?;
// In case of anti / semi joins or if there is embedded projection in
HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
self.join_type(),
@@ -999,6 +1018,9 @@ impl DisplayAs for HashJoinExec {
} else {
""
};
+ let display_fetch = self
+ .fetch
+ .map_or_else(String::new, |f| format!(", fetch={f}"));
let on = self
.on
.iter()
@@ -1007,13 +1029,14 @@ impl DisplayAs for HashJoinExec {
.join(", ");
write!(
f,
- "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
+ "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
+ display_fetch,
)
}
DisplayFormatType::TreeRender => {
@@ -1040,6 +1063,10 @@ impl DisplayAs for HashJoinExec {
writeln!(f, "filter={filter}")?;
}
+ if let Some(fetch) = self.fetch {
+ writeln!(f, "fetch={fetch}")?;
+ }
+
Ok(())
}
}
@@ -1142,6 +1169,7 @@ impl ExecutionPlan for HashJoinExec {
)?,
// Keep the dynamic filter, bounds accumulator will be reset
dynamic_filter: self.dynamic_filter.clone(),
+ fetch: self.fetch,
}))
}
@@ -1165,6 +1193,7 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
// Reset dynamic filter and bounds accumulator to initial state
dynamic_filter: None,
+ fetch: self.fetch,
}))
}
@@ -1329,6 +1358,7 @@ impl ExecutionPlan for HashJoinExec {
build_accumulator,
self.mode,
self.null_aware,
+ self.fetch,
)))
}
@@ -1351,7 +1381,9 @@ impl ExecutionPlan for HashJoinExec {
&self.join_schema,
)?;
// Project statistics if there is a projection
- Ok(stats.project(self.projection.as_ref()))
+ let stats = stats.project(self.projection.as_ref());
+ // Apply fetch limit to statistics
+ stats.with_fetch(self.fetch, 0, 1)
}
/// Tries to push `projection` down through `hash_join`. If possible,
performs the
@@ -1380,18 +1412,22 @@ impl ExecutionPlan for HashJoinExec {
&schema,
self.filter(),
)? {
- Ok(Some(Arc::new(HashJoinExec::try_new(
- Arc::new(projected_left_child),
- Arc::new(projected_right_child),
- join_on,
- join_filter,
- self.join_type(),
+ Ok(Some(Arc::new(
+ HashJoinExecBuilder::new(
+ Arc::new(projected_left_child),
+ Arc::new(projected_right_child),
+ join_on,
+ *self.join_type(),
+ )
+ .with_filter(join_filter)
// Returned early if projection is not None
- None,
- *self.partition_mode(),
- self.null_equality,
- self.null_aware,
- )?)))
+ .with_projection(None)
+ .with_partition_mode(*self.partition_mode())
+ .with_null_equality(self.null_equality)
+ .with_null_aware(self.null_aware)
+ .with_fetch(self.fetch)
+ .build()?,
+ )))
} else {
try_embed_projection(projection, self)
}
@@ -1489,12 +1525,32 @@ impl ExecutionPlan for HashJoinExec {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
}),
+ fetch: self.fetch,
});
result = result.with_updated_node(new_node as Arc<dyn
ExecutionPlan>);
}
}
Ok(result)
}
+
+ fn supports_limit_pushdown(&self) -> bool {
+ // Hash join execution plan does not support pushing limit down
through to children
+ // because the children don't know about the join condition and can't
+ // determine how many rows to produce
+ false
+ }
+
+ fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+
+ fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
+ HashJoinExecBuilder::from(self)
+ .with_fetch(limit)
+ .build()
+ .ok()
+ .map(|exec| Arc::new(exec) as _)
+ }
}
/// Accumulator for collecting min/max bounds from build-side data during hash
join.
diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index 54e620f99d..8af26c1b8a 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -24,6 +24,7 @@ use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::task::Poll;
+use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::joins::Map;
use crate::joins::MapOffset;
use crate::joins::PartitionMode;
@@ -46,7 +47,6 @@ use crate::{
};
use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array};
-use arrow::compute::BatchCoalescer;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{
@@ -221,10 +221,9 @@ pub(super) struct HashJoinStream {
build_waiter: Option<OnceFut<()>>,
/// Partitioning mode to use
mode: PartitionMode,
- /// Output buffer for coalescing small batches into larger ones.
- /// Uses `BatchCoalescer` from arrow to efficiently combine batches.
- /// When batches are already close to target size, they bypass coalescing.
- output_buffer: Box<BatchCoalescer>,
+ /// Output buffer for coalescing small batches into larger ones with
optional fetch limit.
+ /// Uses `LimitedBatchCoalescer` to efficiently combine batches and absorb
limit with 'fetch'
+ output_buffer: LimitedBatchCoalescer,
/// Whether this is a null-aware anti join
null_aware: bool,
}
@@ -375,14 +374,11 @@ impl HashJoinStream {
build_accumulator: Option<Arc<SharedBuildAccumulator>>,
mode: PartitionMode,
null_aware: bool,
+ fetch: Option<usize>,
) -> Self {
- // Create output buffer with coalescing.
- // Use biggest_coalesce_batch_size to bypass coalescing for batches
- // that are already close to target size (within 50%).
- let output_buffer = Box::new(
- BatchCoalescer::new(Arc::clone(&schema), batch_size)
- .with_biggest_coalesce_batch_size(Some(batch_size / 2)),
- );
+ // Create output buffer with coalescing and optional fetch limit.
+ let output_buffer =
+ LimitedBatchCoalescer::new(Arc::clone(&schema), batch_size, fetch);
Self {
partition,
@@ -425,6 +421,11 @@ impl HashJoinStream {
.record_poll(Poll::Ready(Some(Ok(batch))));
}
+ // Check if the coalescer has finished (limit reached and flushed)
+ if self.output_buffer.is_finished() {
+ return Poll::Ready(None);
+ }
+
return match self.state {
HashJoinStreamState::WaitBuildSide => {
handle_state!(ready!(self.collect_build_side(cx)))
@@ -443,7 +444,7 @@ impl HashJoinStream {
}
HashJoinStreamState::Completed if
!self.output_buffer.is_empty() => {
// Flush any remaining buffered data
- self.output_buffer.finish_buffered_batch()?;
+ self.output_buffer.finish()?;
// Continue loop to emit the flushed batch
continue;
}
@@ -782,10 +783,17 @@ impl HashJoinStream {
join_side,
)?;
- self.output_buffer.push_batch(batch)?;
+ let push_status = self.output_buffer.push_batch(batch)?;
timer.done();
+ // If limit reached, finish and move to Completed state
+ if push_status == PushBatchStatus::LimitReached {
+ self.output_buffer.finish()?;
+ self.state = HashJoinStreamState::Completed;
+ return Ok(StatefulStreamResult::Continue);
+ }
+
if next_offset.is_none() {
self.state = HashJoinStreamState::FetchProbeBatch;
} else {
@@ -892,7 +900,12 @@ impl HashJoinStream {
&self.column_indices,
JoinSide::Left,
)?;
- self.output_buffer.push_batch(batch)?;
+ let push_status = self.output_buffer.push_batch(batch)?;
+
+ // If limit reached, finish the coalescer
+ if push_status == PushBatchStatus::LimitReached {
+ self.output_buffer.finish()?;
+ }
}
Ok(StatefulStreamResult::Continue)
diff --git
a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt
b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt
index 1b25a01d62..59f3d8285a 100644
--- a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt
+++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt
@@ -55,7 +55,7 @@ logical_plan
07)--------TableScan: annotated_data projection=[a, c]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5
-02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)],
projection=[a@1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)],
projection=[a@1], fetch=5
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
file_type=csv, has_header=true
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1,
maintains_sort_order=true
05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true
@@ -96,7 +96,7 @@ logical_plan
physical_plan
01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10
02)--ProjectionExec: expr=[a@0 as a2, b@1 as b]
-03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3),
(c@0, c@2)], projection=[a@0, b@1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3),
(c@0, c@2)], projection=[a@0, b@1], fetch=10
04)------CoalescePartitionsExec
05)--------FilterExec: d@1 = 3
06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
diff --git a/datafusion/sqllogictest/test_files/join_limit_pushdown.slt
b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt
new file mode 100644
index 0000000000..6bb23c1b4c
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/join_limit_pushdown.slt
@@ -0,0 +1,269 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests for limit pushdown into joins
+
+# need to use a single partition for deterministic results
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = true;
+
+# Create test tables
+statement ok
+CREATE TABLE t1 (a INT, b VARCHAR) AS VALUES
+ (1, 'one'),
+ (2, 'two'),
+ (3, 'three'),
+ (4, 'four'),
+ (5, 'five');
+
+statement ok
+CREATE TABLE t2 (x INT, y VARCHAR) AS VALUES
+ (1, 'alpha'),
+ (2, 'beta'),
+ (3, 'gamma'),
+ (6, 'delta'),
+ (7, 'epsilon');
+
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Inner Join: t1.a = t2.x
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[x]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 2;
+----
+1 1
+2 2
+
+# Right join is converted to Left join with projection - fetch pushdown is
supported
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3;
+----
+logical_plan
+01)Limit: skip=0, fetch=3
+02)--Right Join: t1.a = t2.x
+03)----TableScan: t1 projection=[a]
+04)----Limit: skip=0, fetch=3
+05)------TableScan: t2 projection=[x], fetch=3
+physical_plan
+01)ProjectionExec: expr=[a@1 as a, x@0 as x]
+02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(x@0, a@0)], fetch=3
+03)----DataSourceExec: partitions=1, partition_sizes=[1], fetch=3
+04)----DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+SELECT t1.a, t2.x FROM t1 RIGHT JOIN t2 ON t1.a = t2.x LIMIT 3;
+----
+1 1
+2 2
+3 3
+
+# Left join supports fetch pushdown
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3;
+----
+logical_plan
+01)Limit: skip=0, fetch=3
+02)--Left Join: t1.a = t2.x
+03)----Limit: skip=0, fetch=3
+04)------TableScan: t1 projection=[a], fetch=3
+05)----TableScan: t2 projection=[x]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, x@0)], fetch=3
+02)--DataSourceExec: partitions=1, partition_sizes=[1], fetch=3
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+SELECT t1.a, t2.x FROM t1 LEFT JOIN t2 ON t1.a = t2.x LIMIT 3;
+----
+1 1
+2 2
+3 3
+
+
+# Full join supports fetch pushdown
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a = t2.x LIMIT 4;
+----
+logical_plan
+01)Limit: skip=0, fetch=4
+02)--Full Join: t1.a = t2.x
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[x]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, x@0)], fetch=4
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+# Note: FULL OUTER JOIN order is not deterministic, so we just check count
+query I
+SELECT COUNT(*) FROM (SELECT t1.a, t2.x FROM t1 FULL OUTER JOIN t2 ON t1.a =
t2.x LIMIT 4);
+----
+4
+
+# EXISTS becomes left semi join - fetch pushdown is supported
+query TT
+EXPLAIN SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x)
LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--LeftSemi Join: t2.x = __correlated_sq_1.a
+03)----TableScan: t2 projection=[x]
+04)----SubqueryAlias: __correlated_sq_1
+05)------TableScan: t1 projection=[a]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(x@0, a@0)], fetch=2
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I
+SELECT t2.x FROM t2 WHERE EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x) LIMIT 2;
+----
+1
+2
+
+# NOT EXISTS becomes LeftAnti - fetch pushdown is supported
+query TT
+EXPLAIN SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a =
t2.x) LIMIT 1;
+----
+logical_plan
+01)Limit: skip=0, fetch=1
+02)--LeftAnti Join: t2.x = __correlated_sq_1.a
+03)----TableScan: t2 projection=[x]
+04)----SubqueryAlias: __correlated_sq_1
+05)------TableScan: t1 projection=[a]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(x@0, a@0)], fetch=1
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I
+SELECT t2.x FROM t2 WHERE NOT EXISTS (SELECT 1 FROM t1 WHERE t1.a = t2.x)
LIMIT 1;
+----
+6
+
+# Inner join should push
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET
1;
+----
+logical_plan
+01)Limit: skip=1, fetch=1
+02)--Inner Join: t1.a = t2.x
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[x]
+physical_plan
+01)GlobalLimitExec: skip=1, fetch=1
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=2
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 1 OFFSET 1;
+----
+2 2
+
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0;
+----
+logical_plan EmptyRelation: rows=0
+physical_plan EmptyExec
+
+query II
+SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 0;
+----
+
+statement ok
+CREATE TABLE t3 (p INT, q VARCHAR) AS VALUES
+ (1, 'foo'),
+ (2, 'bar'),
+ (3, 'baz');
+
+query TT
+EXPLAIN SELECT t1.a, t2.x, t3.p
+FROM t1
+INNER JOIN t2 ON t1.a = t2.x
+INNER JOIN t3 ON t2.x = t3.p
+LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Inner Join: t2.x = t3.p
+03)----Inner Join: t1.a = t2.x
+04)------TableScan: t1 projection=[a]
+05)------TableScan: t2 projection=[x]
+06)----TableScan: t3 projection=[p]
+physical_plan
+01)ProjectionExec: expr=[a@1 as a, x@2 as x, p@0 as p]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p@0, x@1)], fetch=2
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)]
+05)------DataSourceExec: partitions=1, partition_sizes=[1]
+06)------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query III
+SELECT t1.a, t2.x, t3.p
+FROM t1
+INNER JOIN t2 ON t1.a = t2.x
+INNER JOIN t3 ON t2.x = t3.p
+LIMIT 2;
+----
+1 1 1
+2 2 2
+
+# Try larger limit
+query TT
+EXPLAIN SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100;
+----
+logical_plan
+01)Limit: skip=0, fetch=100
+02)--Inner Join: t1.a = t2.x
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[x]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, x@0)], fetch=100
+02)--DataSourceExec: partitions=1, partition_sizes=[1]
+03)--DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+SELECT t1.a, t2.x FROM t1 INNER JOIN t2 ON t1.a = t2.x LIMIT 100;
+----
+1 1
+2 2
+3 3
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+DROP TABLE t3;
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 3edc721afc..2fb544a638 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4161,10 +4161,9 @@ logical_plan
03)----TableScan: t0 projection=[c1, c2]
04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=2
-02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
-03)----DataSourceExec: partitions=1, partition_sizes=[2]
-04)----DataSourceExec: partitions=1, partition_sizes=[2]
+01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], fetch=2
+02)--DataSourceExec: partitions=1, partition_sizes=[2]
+03)--DataSourceExec: partitions=1, partition_sizes=[2]
## Test join.on.is_empty() && join.filter.is_some() -> single filter now a PWMJ
query TT
@@ -4191,10 +4190,9 @@ logical_plan
03)----TableScan: t0 projection=[c1, c2]
04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=2
-02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)],
filter=c2@0 >= c2@1
-03)----DataSourceExec: partitions=1, partition_sizes=[2]
-04)----DataSourceExec: partitions=1, partition_sizes=[2]
+01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)],
filter=c2@0 >= c2@1, fetch=2
+02)--DataSourceExec: partitions=1, partition_sizes=[2]
+03)--DataSourceExec: partitions=1, partition_sizes=[2]
## Add more test cases for join limit pushdown
statement ok
@@ -4245,6 +4243,7 @@ select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
1 1
# can only push down to t1 (preserved side)
+# limit pushdown supported for left join - both to join and probe side
query TT
explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
@@ -4255,10 +4254,9 @@ logical_plan
04)------TableScan: t1 projection=[a], fetch=2
05)----TableScan: t2 projection=[b]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=2
-02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], limit=2, file_type=csv, has_header=true
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], file_type=csv, has_header=true
+01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)], fetch=2
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], limit=2, file_type=csv, has_header=true
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], file_type=csv, has_header=true
######
## RIGHT JOIN w/ LIMIT
@@ -4289,10 +4287,9 @@ logical_plan
04)----Limit: skip=0, fetch=2
05)------TableScan: t2 projection=[b], fetch=2
physical_plan
-01)GlobalLimitExec: skip=0, fetch=2
-02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], file_type=csv, has_header=true
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], limit=2, file_type=csv, has_header=true
+01)HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)], fetch=2
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], file_type=csv, has_header=true
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], limit=2, file_type=csv, has_header=true
######
## FULL JOIN w/ LIMIT
@@ -4316,7 +4313,7 @@ select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
4 4
-# can't push limit for full outer join
+# full outer join supports fetch pushdown
query TT
explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
----
@@ -4326,10 +4323,9 @@ logical_plan
03)----TableScan: t1 projection=[a]
04)----TableScan: t2 projection=[b]
physical_plan
-01)GlobalLimitExec: skip=0, fetch=2
-02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], file_type=csv, has_header=true
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], file_type=csv, has_header=true
+01)HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)], fetch=2
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t1.csv]]},
projection=[a], file_type=csv, has_header=true
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/joins/t2.csv]]},
projection=[b], file_type=csv, has_header=true
statement ok
drop table t1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]