avamingli opened a new pull request, #1635:
URL: https://github.com/apache/cloudberry/pull/1635
Parallel Hash Join has been in PostgreSQL for a while, but `FULL` and
`RIGHT` outer joins were
deliberately left out of the initial implementation — the per-batch barrier
protocol had a
deadlock risk that nobody had solved yet. UPSTREAM finally cracked it by
adding a dedicated scan
phase (`PHJ_BATCH_SCAN`): after all workers finish probing, one elected
worker walks the hash
table looking for unmatched inner rows and emits them, while the rest move
on to other batches
or finish early. It is a clean solution and the upstream fix is compact. We
are in the process
of cherry-picking UPSTREAM into CBDB, and this PR brings in those three
upstream commits as a
foundation.
## Performance
Benchmark on a 3-segment cluster with `parallel_workers = 2` per segment (6
workers total),
6 million rows per table, 50% row overlap between the two sides.
| Query | Parallel | Serial | Speedup |
|-------|----------|--------|---------|
| `FULL JOIN` | 4040 ms | 6347 ms | **1.57×** |
| `RIGHT JOIN` | 3039 ms | 5568 ms | **1.83×** |
### Example plans (3 segments, parallel_workers=2):
#### PARALLEL FULL JOIN:
```sql
-- FULL JOIN: result locus is HashedOJ with Parallel Workers: 2
EXPLAIN(costs off, locus)
SELECT count(*) FROM t1 FULL JOIN t2 USING (id);
Finalize Aggregate
Locus: Entry
-> Gather Motion 6:1 (slice1; segments: 6)
-> Partial Aggregate
Locus: HashedOJ
Parallel Workers: 2
-> Parallel Hash Full Join
Locus: HashedOJ
Parallel Workers: 2
Hash Cond: (t1.id = t2.id)
-> Parallel Seq Scan on t1
Locus: HashedWorkers
-> Parallel Hash
-> Parallel Seq Scan on t2
Locus: HashedWorkers
```
#### PARALLEL RIGHT JOIN
```sql
-- RIGHT JOIN: when t1 is larger the planner hashes the smaller t2
-- and probes with t1; result locus HashedWorkers
EXPLAIN(costs off, locus)
SELECT count(*) FROM t1 RIGHT JOIN t2 USING (id);
Finalize Aggregate
Locus: Entry
-> Gather Motion 6:1 (slice1; segments: 6)
-> Partial Aggregate
Locus: HashedWorkers
Parallel Workers: 2
-> Parallel Hash Right Join
Locus: HashedWorkers
Parallel Workers: 2
Hash Cond: (t1.id = t2.id)
-> Parallel Seq Scan on t1
Locus: HashedWorkers
-> Parallel Hash
-> Parallel Seq Scan on t2
Locus: HashedWorkers
```
On top of the upstream work, CBDB's distributed execution model needed its
own adaptations.
The tricky part is what happens to the result of a parallel full outer join:
in a distributed
cluster the NULL-extended rows for unmatched tuples can land on any segment,
so the result
cannot carry a plain `Hashed` locus — it needs `HashedOJ`. Before this PR
the distributed
planner simply rejected `FULL` and `RIGHT` joins from the parallel path
entirely, so every
such query fell back to serial execution on all segments. The planner now
accepts both join
types, assigns the correct `HashedOJ` locus to a full join's output, and —
importantly —
remembers to carry the `parallel_workers` count on that locus. Without that
last detail,
any aggregate or further join sitting above the full join would see a locus
with zero workers
and silently go serial, wiping out the benefit.
There is also a crash fix bundled here. A parallel `NOT IN` subquery join
(`LASJ_NOTIN`) has
a special fast path: the moment any inner row with a NULL key is found, the
whole join can
return empty, so the worker exits early. The problem is that this early exit
happened before
the worker ever attached to the probing barrier for the current batch.
Later, during shutdown,
the code still tried to arrive-and-detach from that barrier, which had zero
participants and
asserted. The fix is straightforward: skip the barrier call when the
early-exit condition was
triggered.
### Type of Change
- [ ] Bug fix (non-breaking change)
- [ ] New feature (non-breaking change)
- [ ] Breaking change (fix or feature with breaking changes)
- [ ] Documentation update
### Breaking Changes
<!-- Remove if not applicable. If yes, explain impact and migration path -->
### Test Plan
<!-- How did you test these changes? -->
- [ ] Unit tests added/updated
- [ ] Integration tests added/updated
- [ ] Passed `make installcheck`
- [ ] Passed `make -C src/test installcheck-cbdb-parallel`
### Impact
<!-- Remove sections that don't apply -->
**Performance:**
<!-- Any performance implications? -->
**User-facing changes:**
<!-- Any changes visible to users? -->
**Dependencies:**
<!-- New dependencies or version changes? -->
### Checklist
- [ ] Followed [contribution
guide](https://cloudberry.apache.org/contribute/code)
- [ ] Added/updated documentation
- [ ] Reviewed code for security implications
- [ ] Requested review from [cloudberry
committers](https://github.com/orgs/apache/teams/cloudberry-committers)
### Additional Context
<!-- Any other information that would help reviewers? Remove if none -->
### CI Skip Instructions
<!--
To skip CI builds, add the appropriate CI skip identifier to your PR title.
The identifier must:
- Be in square brackets []
- Include the word "ci" and either "skip" or "no"
- Only use for documentation-only changes or when absolutely necessary
-->
---
<!-- Join our community:
- Mailing list:
[[email protected]](https://lists.apache.org/[email protected])
(subscribe: [email protected])
- Discussions: https://github.com/apache/cloudberry/discussions -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]