GitHub user zheniasigayev added a comment to the discussion: Best practices for
memory-efficient deduplication of pre-sorted Parquet files
Addressing Question 2)
It's not possible to remove the `first_value()` aggregate from the above query
since `col_7` and `col_8` won't appear in the `GROUP BY`.
```
Error during planning: Column in SELECT must be in GROUP BY or an aggregate
function: While expanding wildcard, column "example.col_7" must appear in the
GROUP BY clause or must be part of an aggregate function, currently only
"example.col_1, example.col_2, example.col_3, example.col_4, example.col_5,
example.col_6" appears in the SELECT clause satisfies this requirement
```
Instead, I removed `col_7` and `col_8` (the columns which `first_value()`
aggregate is applied to). This is the resulting query:
```sql
CREATE EXTERNAL TABLE example (
col_1 VARCHAR(50) NOT NULL,
col_2 BIGINT NOT NULL,
col_3 VARCHAR(50),
col_4 VARCHAR(50),
col_5 VARCHAR(50),
col_6 VARCHAR(100) NOT NULL,
col_7 VARCHAR(50),
col_8 DOUBLE
)
WITH ORDER (col_1 ASC, col_2 ASC)
STORED AS PARQUET
LOCATION '/tmp/redacted/*.parquet';
COPY (
SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6
FROM
example
GROUP BY
col_1, col_2, col_3, col_4, col_5, col_6
ORDER BY
col_1 ASC, col_2 ASC
)
TO '/tmp/result_part2.parquet'
STORED AS PARQUET
OPTIONS (compression 'zstd(1)');
```
The resulting `EXPLAIN` output:
```
+---------------+-------------------------------+
| plan_type | plan |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ DataSinkExec │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortPreservingMergeExec │ |
| | │ -------------------- │ |
| | │ col_1 ASC NULLS LAST, │ |
| | │ col_2 ASC NULLS LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortExec │ |
| | │ -------------------- │ |
| | │ col_1@0 ASC NULLS LAST, │ |
| | │ col_2@1 ASC NULLS LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ group_by: │ |
| | │ col_1, col_2, col_3, col_4│ |
| | │ , col_5, col_6 │ |
| | │ │ |
| | │ mode: │ |
| | │ FinalPartitioned │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ CoalesceBatchesExec │ |
| | │ -------------------- │ |
| | │ target_batch_size: │ |
| | │ 8192 │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ RepartitionExec │ |
| | │ -------------------- │ |
| | │ partition_count(in->out): │ |
| | │ 10 -> 10 │ |
| | │ │ |
| | │ partitioning_scheme: │ |
| | │ Hash([col_1@0, col_2@1, │ |
| | │ col_3@2, col_4@3, │ |
| | │ col_5@4, col_6@5], │ |
| | │ 10) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ AggregateExec │ |
| | │ -------------------- │ |
| | │ group_by: │ |
| | │ col_1, col_2, col_3, col_4│ |
| | │ , col_5, col_6 │ |
| | │ │ |
| | │ mode: Partial │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ DataSourceExec │ |
| | │ -------------------- │ |
| | │ files: 24 │ |
| | │ format: parquet │ |
| | └───────────────────────────┘ |
| | |
+---------------+-------------------------------+
```
Executing the query results in:
```
Not enough memory to continue external sort. Consider increasing the memory
limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers
(across reservations) as:
ExternalSorter[6]#32(can spill: true) consumed 2.7 GB,
ExternalSorter[1]#17(can spill: true) consumed 2.7 GB,
GroupedHashAggregateStream[2] ()#19(can spill: true) consumed 307.0 MB,
GroupedHashAggregateStream[4] ()#25(can spill: true) consumed 306.2 MB,
GroupedHashAggregateStream[3] ()#22(can spill: true) consumed 305.2 MB,
GroupedHashAggregateStream[8] ()#37(can spill: true) consumed 305.1 MB,
GroupedHashAggregateStream[7] ()#34(can spill: true) consumed 304.6 MB,
GroupedHashAggregateStream[9] ()#40(can spill: true) consumed 304.5 MB,
GroupedHashAggregateStream[0] ()#13(can spill: true) consumed 304.4 MB,
GroupedHashAggregateStream[5] ()#28(can spill: true) consumed 28.3 MB,
GroupedHashAggregateStream[1] ()#16(can spill: true) consumed 28.3 MB,
GroupedHashAggregateStream[6] ()#31(can spill: true) consumed 28.3 MB,
ExternalSorterMerge[5]#30(can spill: false) consumed 10.0 MB,
ExternalSorter[3]#23(can spill: true) consumed 0.0 B,
RepartitionExec[2]#43(can spill: false) consumed 0.0 B,
SortPreservingMergeExec[0]#2(can spill: false) consumed 0.0 B,
ExternalSorterMerge[4]#27(can spill: false) consumed 0.0 B,
ExternalSorterMerge[0]#15(can spill: false) consumed 0.0 B,
ExternalSorterMerge[1]#18(can spill: false) consumed 0.0 B,
ExternalSorterMerge[2]#21(can spill: false) consumed 0.0 B,
RepartitionExec[3]#44(can spill: false) consumed 0.0 B,
ExternalSorterMerge[8]#39(can spill: false) consumed 0.0 B,
ExternalSorter[2]#20(can spill: true) consumed 0.0 B,
ExternalSorter[8]#38(can spill: true) consumed 0.0 B,
ExternalSorter[0]#14(can spill: true) consumed 0.0 B.
Error: Failed to allocate additional 552.9 MB for ExternalSorter[5] with 0.0 B
already allocated for this reservation - 412.8 MB remain available for the
total pool
```
GitHub link:
https://github.com/apache/datafusion/discussions/16776#discussioncomment-13780240
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]