stuhood commented on code in PR #21983:
URL: https://github.com/apache/datafusion/pull/21983#discussion_r3290142604


##########
datafusion/physical-plan/src/joins/join_accelerator.rs:
##########
@@ -0,0 +1,453 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Join accelerator interfaces used by [`crate::joins::NestedLoopJoinExec`], 
see
+//! comments in [`JoinAccelerator`] for details.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::JoinSide;
+use datafusion_expr::JoinType;
+
+use super::join_filter::JoinFilter;
+use datafusion_common::{Result, internal_datafusion_err, internal_err};
+
+/// Shared reference to a selected join accelerator.
+pub(crate) type JoinAcceleratorRef = Arc<dyn JoinAccelerator>;
+
+/// Planning-time specification used to select a join accelerator.
+#[derive(Debug, Clone)]
+pub(crate) struct JoinAcceleratorSpec {
+    #[expect(dead_code)]
+    // Kept for accelerator selection once non-fallback implementations are 
enabled.
+    join_type: JoinType,
+    build_side: JoinSide,
+    left_schema: SchemaRef,
+    right_schema: SchemaRef,
+    #[expect(dead_code)]
+    // Kept for accelerator selection once non-fallback implementations are 
enabled.
+    filter: Option<JoinFilter>,
+}
+
+impl JoinAcceleratorSpec {
+    pub(crate) fn new(
+        join_type: JoinType,
+        build_side: JoinSide,
+        left_schema: SchemaRef,
+        right_schema: SchemaRef,
+        filter: Option<JoinFilter>,
+    ) -> Self {
+        Self {
+            join_type,
+            build_side,
+            left_schema,
+            right_schema,
+            filter,
+        }
+    }
+
+    pub(crate) fn build_schema(&self) -> &SchemaRef {
+        match self.build_side {
+            JoinSide::Left => &self.left_schema,
+            JoinSide::Right => &self.right_schema,
+            JoinSide::None => unreachable!("Join accelerator build side cannot 
be None"),
+        }
+    }
+}
+
+/// Selects a planning-time join accelerator.
+#[derive(Debug, Default)]
+pub(crate) struct JoinAcceleratorBuilder;
+
+impl JoinAcceleratorBuilder {
+    /// Select the accelerator for a nested loop join.
+    ///
+    /// This always succeeds because NLJ has a naive cartesian fallback.
+    pub(crate) fn try_new(spec: JoinAcceleratorSpec) -> 
Result<JoinAcceleratorRef> {
+        Ok(Arc::new(FallbackNestedLoopJoinAccelerator::new(spec)))
+    }
+}
+
+/// Nested loop join accelerator selected at planning time.
+///
+/// The accelerator is used by joins that first buffer one input, then probe 
the
+/// other input one batch at a time. It provides two extension points for 
reducing
+/// probe work:
+///
+/// - Runtime indexing on the build side. After all build batches are buffered,
+///   the accelerator can create an index-like representation that finds the
+///   candidate build rows for each probe row more cheaply than a cartesian 
scan.
+/// - Dynamic filtering on the probe side. Build-side statistics can produce a
+///   probe-side filter that eliminates rows which cannot match any build row.
+///
+/// Accelerators should only handle an accelerated predicate: a conjunct from a
+/// conjunction join condition (e.g. `cond1` from `cond1 AND cond2 AND cond3`),
+/// and:
+///
+/// - It should emit candidate pairs efficiently, using the accelerated 
predicate
+///   to avoid unnecessary cartesian comparisons, for example by building an 
index
+///   on the build side before probing.
+/// - It must not discard any pair that could satisfy the full join filter. The
+///   accelerator may emit a superset of the final matches.
+///
+/// Residual join filter evaluation and outer, semi, anti, and mark join match
+/// tracking are handled by the outer nested-loop join driver. See the examples
+/// below for term definitions.
+///
+/// # Example: PiecewiseMergeJoin
+///
+/// For workload
+///
+/// ```sql
+/// select *
+/// from generate_series(1000) as t1(v1)
+/// join generate_series(1000000) as t2(v1)
+/// on (t1.v1 > t2.v1) AND ((t1.v1 + t2.v1) % 2 = 0)
+/// ```
+///
+/// `(t1.v1 > t2.v1)` is the accelerated predicate because it can be evaluated
+/// efficiently with a specific algorithm; see below for details.
+/// `((t1.v1 + t2.v1) % 2 = 0)` is the residual predicate and is evaluated by
+/// the outer nested-loop join driver after candidate pairs are produced.
+///
+/// ## (TODO) Dynamic filter
+///
+/// After buffering `t1` as the build side, the accelerator knows
+/// `max(t1.v1) = 1000` for `generate_series(1000)`. For an inner join, probe
+/// rows with `t2.v1 >= 1000` cannot satisfy `t1.v1 > t2.v1` and can be 
filtered
+/// out.
+/// This reduces the candidate search space from roughly `1K x 1M` pairs to
+/// roughly `1K x 1K` pairs before residual filter evaluation.
+///
+/// ## Runtime index
+///
+/// The accelerator can also sort the buffered `t1` rows by `v1`. For each
+/// incoming `t2` batch, it sorts the probe rows by `v1` and scans the two 
sorted
+/// runs once. For a probe row where `t2.v1 = 10`, the matching build rows are 
the
+/// suffix of sorted `t1` rows with `t1.v1 > 10`; for a later probe row where
+/// `t2.v1 = 20`, the scan cursor only moves forward. This avoids checking 
every
+/// build row for every probe row while still producing all pairs that satisfy 
the
+/// accelerated predicate. The residual predicate is then applied to those 
pairs
+/// by the join driver.
+/// This step further reduces the operation count from `1K x 1K` to
+/// `log(1K) + 1K + 1K` (sort and linear scans).
+///
+/// # Control flow
+/// The following pseudo-code demonstrates the high-level join control flow and
+/// how functions in this trait are used.
+///
+/// ```text
+/// for build_batch in build_input {
+///     accelerator.add_build_batch(build_batch)?;
+/// }
+/// accelerator.finish()?;
+///
+/// for probe_batch in probe_input {
+///     let (probe_batch, mut prober) =
+///         accelerator.init_prober(probe_batch, batch_size)?;
+///
+///     while let Some(candidates) = prober.probe()? {
+///         // The join driver consumes candidates with the prepared 
probe_batch,
+///         // applies the residual filter, and records join matches.
+///     }
+/// }
+/// ```
+///
+/// # Implementation Plan
+/// This trait is intended to become public. For now, keep it private while
+/// internal experiments stabilize the API.
+pub(crate) trait JoinAccelerator: Debug + Send + Sync {
+    #[cfg_attr(not(test), expect(dead_code))]
+    // Will be used in explain output when accelerator selection is visible.
+    fn name(&self) -> &'static str;
+
+    /// Return `true` only if this accelerator supports the nested-loop join
+    /// spilling execution path. See [`super::NestedLoopJoinExec`] for details.
+    fn support_spilling(&self) -> bool {
+        false
+    }
+
+    /// Create a fresh mutable accelerator for one execution.
+    fn clone_accelerator(&self) -> Box<dyn JoinAccelerator>;
+
+    /// Add one build-side input batch to the buffer, and optionally build a
+    /// runtime index.
+    fn add_build_batch(&mut self, batch: RecordBatch) -> Result<()>;
+
+    /// Signal the end of build-side input and prepare for probing.
+    fn finish(&mut self) -> Result<()>;

Review Comment:
   Should both of these be async, in order to allow spilling of larger indexes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to