avamingli opened a new pull request, #1213:
URL: https://github.com/apache/cloudberry/pull/1213
This commit implements improvements to the handling of UNION operations in
CBDB, specifically addressing challenges related to Parallel Append and Motion
nodes within subqueries. We have disabled Parallel Append for UNION operations
to prevent incorrect results caused by competition among workers for subnodes.
This change mitigates the risk of premature task completion, which previously
led to data loss in scenarios involving Motion Senders.
To further enhance parallel processing capabilities, we have introduced a
Parallel-oblivious Append approach. This allows multiple workers to operate
independently without sharing state, eliminating the coordination issues
associated with Parallel-aware Append strategies.
By implementing these changes, we improve the reliability and correctness of
UNION operations while maintaining overall system performance. This positions
CBDB to effectively support parallel processing in a safer manner.
```sql
select distinct a from t_distinct_0 union select distinct b from
t_distinct_0;
QUERY PLAN
----------------------------------------------------------------------
Gather Motion 6:1 (slice1; segments: 6)
-> HashAggregate
Group Key: t_distinct_0.a
-> Redistribute Motion 6:6 (slice2; segments: 6)
Hash Key: t_distinct_0.a
Hash Module: 3
-> Append
-> GroupAggregate
Group Key: t_distinct_0.a
-> Sort
Sort Key: t_distinct_0.a
-> Redistribute Motion 6:6 (slice3;
segments: 6)
Hash Key: t_distinct_0.a
Hash Module: 3
-> Streaming HashAggregate
Group Key: t_distinct_0.a
-> Parallel Seq Scan on
t_distinct_0
-> GroupAggregate
Group Key: t_distinct_0_1.b
-> Sort
Sort Key: t_distinct_0_1.b
-> Redistribute Motion 6:6 (slice4;
segments: 6)
Hash Key: t_distinct_0_1.b
Hash Module: 3
-> Streaming HashAggregate
Group Key: t_distinct_0_1.b
-> Parallel Seq Scan on
t_distinct_0 t_distinct_0_1
```
### performance
see case[0] below
#### no-parallel (3031.346 ms)
```sql
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (actual time=3024.080..3032.081
rows=19999 loops=1)
-> HashAggregate (actual time=3024.080..3024.080 rows=6722 loops=1)
Group Key: ao1.b, (count(ao1.a))
Extra Text: (seg0) hash table(s): 1; chain length 4.0 avg, 23
max; using 6722 of 8192 buckets; total 0 expansions.
-> Redistribute Motion 3:3 (slice2; segments: 3) (actual
time=3008.080..3016.080 rows=6723 loops=1)
Hash Key: ao1.b, (count(ao1.a))
-> Append (actual time=3004.080..3016.080 rows=6768 loops=1)
-> Finalize HashAggregate (actual
time=3004.080..3008.080 rows=3384 loops=1)
Group Key: ao1.b
Extra Text: (seg0) hash table(s): 1; chain
length 2.3 avg, 6 max; using 3368 of 8192 buckets; total 1 expansions.
-> Redistribute Motion 3:3 (slice3; segments:
3) (actual time=2880.077..2996.080 rows=3384 loops=1)
Hash Key: ao1.b
-> Partial HashAggregate (actual
time=3000.080..3004.080 rows=3385 loops=1)
Group Key: ao1.b
Extra Text: (seg0) hash table(s):
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
-> Seq Scan on ao1 (actual
time=0.000..1292.034 rows=3466240 loops=1)
-> Finalize HashAggregate (actual time=8.000..8.000
rows=3384 loops=1)
Group Key: ao2.b
Extra Text: (seg0) hash table(s): 1; chain
length 4.4 avg, 16 max; using 3368 of 4096 buckets; total 0 expansions.
-> Redistribute Motion 3:3 (slice4; segments:
3) (actual time=0.000..4.000 rows=3384 loops=1)
Hash Key: ao2.b
-> Partial HashAggregate (actual
time=3004.080..3004.080 rows=3385 loops=1)
Group Key: ao2.b
Extra Text: (seg0) hash table(s):
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
-> Seq Scan on ao2 (actual
time=0.000..1340.036 rows=3466240 loops=1)
Planning Time: 1.192 ms
(slice0) Executor memory: 1267K bytes.
* (slice1) Executor memory: 434K bytes avg x 3x(0) workers, 436K bytes
max (seg0). Work_mem: 721K bytes max, 721K bytes wanted.
* (slice2) Executor memory: 598K bytes avg x 3x(0) workers, 666K bytes
max (seg0). Work_mem: 721K bytes max, 721K bytes wanted.
* (slice3) Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes
max (seg1). Work_mem: 913K bytes max, 913K bytes wanted.
* (slice4) Executor memory: 878K bytes avg x 3x(0) workers, 880K bytes
max (seg1). Work_mem: 913K bytes max, 913K bytes wanted.
Memory used: 128000kB
Memory wanted: 5260kB
Optimizer: Postgres query optimizer
Execution Time: 3031.346 ms
(40 rows)
```
#### 4-parallel UNION (1226.660 ms)
```sql
Gather Motion 12:1 (slice1; segments: 12) (actual time=1180.031..1188.031
rows=19999 loops=1)
-> HashAggregate (actual time=1168.031..1168.031 rows=1670 loops=1)
Group Key: ao1.b, (count(ao1.a))
Extra Text: (seg0) hash table(s): 1; chain length 2.1 avg, 3 max;
using 1647 of 8192 buckets; total 0 expansions.
-> Redistribute Motion 12:12 (slice2; segments: 12) (actual
time=1164.031..1164.031 rows=1670 loops=1)
Hash Key: ao1.b, (count(ao1.a))
Hash Module: 3
-> Append (actual time=1148.030..1164.031 rows=1620 loops=1)
-> Finalize HashAggregate (actual
time=1148.030..1148.030 rows=810 loops=1)
Group Key: ao1.b
Extra Text: (seg0) hash table(s): 1; chain
length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
-> Redistribute Motion 12:12 (slice3; segments:
12) (actual time=932.025..1140.030 rows=3240 loops=1)
Hash Key: ao1.b
Hash Module: 3
-> Partial HashAggregate (actual
time=1140.030..1140.030 rows=3385 loops=1)
Group Key: ao1.b
Extra Text: (seg0) hash table(s):
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
-> Parallel Seq Scan on ao1 (actual
time=4.000..484.013 rows=900000 loops=1)
-> Finalize HashAggregate (actual time=8.000..8.000
rows=810 loops=1)
Group Key: ao2.b
Extra Text: (seg0) hash table(s): 1; chain
length 2.1 avg, 4 max; using 855 of 4096 buckets; total 0 expansions.
-> Redistribute Motion 12:12 (slice4; segments:
12) (actual time=0.000..8.000 rows=3240 loops=1)
Hash Key: ao2.b
Hash Module: 3
-> Partial HashAggregate (actual
time=1064.028..1068.028 rows=3385 loops=1)
Group Key: ao2.b
Extra Text: (seg0) hash table(s):
1; chain length 2.1 avg, 4 max; using 3368 of 16384 buckets; total 0 expansions.
-> Parallel Seq Scan on ao2 (actual
time=0.000..524.014 rows=848832 loops=1)
Planning Time: 1.097 ms
(slice0) Executor memory: 1273K bytes.
* (slice1) Executor memory: 315K bytes avg x 12x(0) workers, 317K bytes
max (seg0). Work_mem: 337K bytes max, 337K bytes wanted.
* (slice2) Executor memory: 371K bytes avg x 12x(0) workers, 374K bytes
max (seg0). Work_mem: 241K bytes max, 241K bytes wanted.
* (slice3) Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes
max (seg1). Work_mem: 913K bytes max, 913K bytes wanted.
* (slice4) Executor memory: 918K bytes avg x 12x(0) workers, 920K bytes
max (seg1). Work_mem: 913K bytes max, 913K bytes wanted.
Memory used: 128000kB
Memory wanted: 5260kB
Optimizer: Postgres query optimizer
Execution Time: 1226.660 ms
(43 rows)
```
#### case[0]
```sql
create table ao1(a int, b int) using ao_column;
create table ao2(a int, b int) using ao_column;
insert into ao1 select i, i+1 from generate_series(1, 10000) i;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
insert into ao1 select * from ao1;
analyze ao1;
insert into ao2 select * from ao1;
analyze ao2;
```
Authored-by: Zhang Mingli [email protected]
<!-- Thank you for your contribution to Apache Cloudberry (Incubating)! -->
Fixes #ISSUE_Number
### What does this PR do?
<!-- Brief overview of the changes, including any major features or fixes -->
### Type of Change
- [ ] Bug fix (non-breaking change)
- [ ] New feature (non-breaking change)
- [ ] Breaking change (fix or feature with breaking changes)
- [ ] Documentation update
### Breaking Changes
<!-- Remove if not applicable. If yes, explain impact and migration path -->
### Test Plan
<!-- How did you test these changes? -->
- [ ] Unit tests added/updated
- [ ] Integration tests added/updated
- [ ] Passed `make installcheck`
- [ ] Passed `make -C src/test installcheck-cbdb-parallel`
### Impact
<!-- Remove sections that don't apply -->
**Performance:**
<!-- Any performance implications? -->
**User-facing changes:**
<!-- Any changes visible to users? -->
**Dependencies:**
<!-- New dependencies or version changes? -->
### Checklist
- [ ] Followed [contribution
guide](https://cloudberry.apache.org/contribute/code)
- [ ] Added/updated documentation
- [ ] Reviewed code for security implications
- [ ] Requested review from [cloudberry
committers](https://github.com/orgs/apache/teams/cloudberry-committers)
### Additional Context
<!-- Any other information that would help reviewers? Remove if none -->
### CI Skip Instructions
<!--
To skip CI builds, add the appropriate CI skip identifier to your PR title.
The identifier must:
- Be in square brackets []
- Include the word "ci" and either "skip" or "no"
- Only use for documentation-only changes or when absolutely necessary
-->
---
<!-- Join our community:
- Mailing list:
[[email protected]](https://lists.apache.org/[email protected])
(subscribe: [email protected])
- Discussions: https://github.com/apache/cloudberry/discussions -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]