nathanb9 opened a new pull request, #22791:
URL: https://github.com/apache/datafusion/pull/22791
## Which issue does this PR close?
Related to #22641 (same user-visible symptom, **different code path** — see
below). Not a duplicate.
## Rationale for this change
`NestedLoopJoinExec` can return **incorrect, non-deterministic** results for
`LEFT`/`RIGHT`/`FULL` (and other left-final-emission) joins when the probe
side
has **more than one partition** — i.e. essentially any non-equi join under
the
default multi-partition config. Some unmatched-left rows are emitted
**twice**:
once correctly and once as a spurious `NULL`-padded "unmatched" row.
### Root cause
The probe streams share a `probe_threads_counter`. Each stream is supposed to
decrement it exactly once (when its right input is exhausted); the stream
that
drives the counter to `0` is the one that emits the unmatched-left rows,
*after*
all partitions have finished probing.
But in `handle_emit_left_unmatched`, after `process_left_unmatched` returns
`Ok(false)`, `maybe_flush_ready_batch()` can `return` early with a ready
batch
**before the state advances to `Done`**. The stream is still in
`EmitLeftUnmatched`, so the next poll re-enters `process_left_unmatched` with
`left_emit_idx == 0` and the gate
`self.left_emit_idx == 0 && !left_data.report_probe_completed()`
**decrements the
shared counter a second time**. The counter then hits `0` *before* every
partition has finished probing, so a partition emits unmatched-left rows
early —
including left rows that match in a not-yet-drained partition. (In traces the
counter visibly underflows `0 → usize::MAX`.) Because it depends on partition
scheduling, the result is non-deterministic.
This is **distinct from #22641 / #22671**, which address the *memory-limited
spill
fallback* path. This bug reproduces with unbounded memory and with the disk
manager disabled (spill path unreachable), i.e. it is in the normal
single-pass
path introduced by the NLJ rewrite (#16996).
## What changes are included in this PR?
Decrement the shared `probe_threads_counter` **exactly once per probe
stream** by
guarding it with a new `probe_completed_reported: bool` on
`NestedLoopJoinStream` (reset per chunk in the memory-limited path).
One-file,
+39/−5.
## Are these changes tested?
- All existing `nested_loop_join` unit tests pass (42/42).
- Verified end-to-end with a standalone harness that runs the failing query
in a
loop on a multi-threaded runtime (the bug is a scheduling race, so a
single run
often passes even when broken; a tight loop exposes it reliably):
| run | before fix | after fix |
|---|---|---|
| 60× LEFT join, `target_partitions=4`, `batch_size=4` | `{6:42, 7:7, 8:4,
9:7}` | **`{6:60}`** |
| sweep of `target_partitions × batch_size` | wrong at every `tp ≥ 2` |
**always correct** |
> A deterministic unit test was attempted but the race does not reliably
trigger
> under the test harness's runtime, so it is intentionally omitted rather
than
> shipping a test that passes on the buggy code. Reviewers wanting to
reproduce
> can use the snippet below in a loop.
### Reproduction (datafusion-cli)
The result must always be `6`. Because it is a scheduling race it is
intermittent
per invocation — loop it to observe divergence on an unpatched build:
```sql
set datafusion.execution.target_partitions = 4;
set datafusion.execution.batch_size = 4;
create table t1(d date, b boolean, i int, v bigint) as values
(date '1982-10-10', false, null, 4),
(date '2064-11-25', true, null, -72),
(date '2064-10-06', true, -95, -41),
(date '2013-01-20', false, 98, null),
(date '2011-04-30', false, 42, -91);
create table t2(s1 varchar, s2 varchar, w bigint) as values
('C.:~$wZ.bY@U|lN$fip1>N mZ', 'gUP0', 49),
('Dw8l4N(*Z<s#,Z', 'tS', 58),
('<Y-xI/8zG:a47tBp#vo%ah', 'I=Ieh', 83),
('Z]^6fijx3$', 'm0vc', 3),
('Id*B', '[3<_', -76);
-- Deterministic correct answer (verify with target_partitions = 1): 6 rows.
select count(*) as left_join_rows
from t1 left join t2 on (t1.v < t2.w and t2.s2 <= 'a');
```
- **Before:** intermittently returns `7`, `8`, or `9` (extra NULL-padded
rows).
- **After:** always returns `6`.
## Are there any user-facing changes?
Bug fix only; no API changes. Outer / semi / anti / mark nested-loop joins
with
multiple probe partitions now return correct, deterministic results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]