GitHub user zheniasigayev added a comment to the discussion: Best practices for 
memory-efficient deduplication of pre-sorted Parquet files

Addressing Question 2)

It's not possible to remove the `first_value()` aggregate from the above query 
since `col_7` and `col_8` won't appear in the `GROUP BY`.
```
Error during planning: Column in SELECT must be in GROUP BY or an aggregate 
function: While expanding wildcard, column "example.col_7" must appear in the 
GROUP BY clause or must be part of an aggregate function, currently only 
"example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, 
example.col_6" appears in the SELECT clause satisfies this requirement
```

Instead, I removed `col_7` and `col_8` (the columns which `first_value()` 
aggregate is applied to). This is the resulting query:

```sql
CREATE EXTERNAL TABLE example (
    col_1 VARCHAR(50) NOT NULL,
    col_2 BIGINT NOT NULL,
    col_3 VARCHAR(50),
    col_4 VARCHAR(50),
    col_5 VARCHAR(50),
    col_6 VARCHAR(100) NOT NULL,
    col_7 VARCHAR(50),
    col_8 DOUBLE
) 
WITH ORDER (col_1 ASC, col_2 ASC) 
STORED AS PARQUET 
LOCATION '/tmp/redacted/*.parquet';

COPY (
    SELECT 
        col_1,
        col_2,
        col_3,
        col_4,
        col_5,
        col_6
    FROM 
        example 
    GROUP BY 
        col_1, col_2, col_3, col_4, col_5, col_6
    ORDER BY 
        col_1 ASC, col_2 ASC
) 
TO '/tmp/result_part2.parquet' 
STORED AS PARQUET 
OPTIONS (compression 'zstd(1)');
```

The resulting `EXPLAIN` output:

```
+---------------+-------------------------------+
| plan_type     | plan                          |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
|               | │        DataSinkExec       │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │  SortPreservingMergeExec  │ |
|               | │    --------------------   │ |
|               | │   col_1 ASC NULLS LAST,   │ |
|               | │    col_2 ASC NULLS LAST   │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │          SortExec         │ |
|               | │    --------------------   │ |
|               | │  col_1@0 ASC NULLS LAST,  │ |
|               | │   col_2@1 ASC NULLS LAST  │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       AggregateExec       │ |
|               | │    --------------------   │ |
|               | │         group_by:         │ |
|               | │ col_1, col_2, col_3, col_4│ |
|               | │       , col_5, col_6      │ |
|               | │                           │ |
|               | │           mode:           │ |
|               | │      FinalPartitioned     │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │    CoalesceBatchesExec    │ |
|               | │    --------------------   │ |
|               | │     target_batch_size:    │ |
|               | │            8192           │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │      RepartitionExec      │ |
|               | │    --------------------   │ |
|               | │ partition_count(in->out): │ |
|               | │          10 -> 10         │ |
|               | │                           │ |
|               | │    partitioning_scheme:   │ |
|               | │  Hash([col_1@0, col_2@1,  │ |
|               | │      col_3@2, col_4@3,    │ |
|               | │     col_5@4, col_6@5],    │ |
|               | │             10)           │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       AggregateExec       │ |
|               | │    --------------------   │ |
|               | │         group_by:         │ |
|               | │ col_1, col_2, col_3, col_4│ |
|               | │       , col_5, col_6      │ |
|               | │                           │ |
|               | │       mode: Partial       │ |
|               | └─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      │ |
|               | │    --------------------   │ |
|               | │         files: 24         │ |
|               | │      format: parquet      │ |
|               | └───────────────────────────┘ |
|               |                               |
+---------------+-------------------------------+
```

Executing the query results in:

```
Not enough memory to continue external sort. Consider increasing the memory 
limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed with top memory consumers 
(across reservations) as:
  ExternalSorter[6]#32(can spill: true) consumed 2.7 GB,
  ExternalSorter[1]#17(can spill: true) consumed 2.7 GB,
  GroupedHashAggregateStream[2] ()#19(can spill: true) consumed 307.0 MB,
  GroupedHashAggregateStream[4] ()#25(can spill: true) consumed 306.2 MB,
  GroupedHashAggregateStream[3] ()#22(can spill: true) consumed 305.2 MB,
  GroupedHashAggregateStream[8] ()#37(can spill: true) consumed 305.1 MB,
  GroupedHashAggregateStream[7] ()#34(can spill: true) consumed 304.6 MB,
  GroupedHashAggregateStream[9] ()#40(can spill: true) consumed 304.5 MB,
  GroupedHashAggregateStream[0] ()#13(can spill: true) consumed 304.4 MB,
  GroupedHashAggregateStream[5] ()#28(can spill: true) consumed 28.3 MB,
  GroupedHashAggregateStream[1] ()#16(can spill: true) consumed 28.3 MB,
  GroupedHashAggregateStream[6] ()#31(can spill: true) consumed 28.3 MB,
  ExternalSorterMerge[5]#30(can spill: false) consumed 10.0 MB,
  ExternalSorter[3]#23(can spill: true) consumed 0.0 B,
  RepartitionExec[2]#43(can spill: false) consumed 0.0 B,
  SortPreservingMergeExec[0]#2(can spill: false) consumed 0.0 B,
  ExternalSorterMerge[4]#27(can spill: false) consumed 0.0 B,
  ExternalSorterMerge[0]#15(can spill: false) consumed 0.0 B,
  ExternalSorterMerge[1]#18(can spill: false) consumed 0.0 B,
  ExternalSorterMerge[2]#21(can spill: false) consumed 0.0 B,
  RepartitionExec[3]#44(can spill: false) consumed 0.0 B,
  ExternalSorterMerge[8]#39(can spill: false) consumed 0.0 B,
  ExternalSorter[2]#20(can spill: true) consumed 0.0 B,
  ExternalSorter[8]#38(can spill: true) consumed 0.0 B,
  ExternalSorter[0]#14(can spill: true) consumed 0.0 B.
Error: Failed to allocate additional 552.9 MB for ExternalSorter[5] with 0.0 B 
already allocated for this reservation - 412.8 MB remain available for the 
total pool
```




GitHub link: 
https://github.com/apache/datafusion/discussions/16776#discussioncomment-13780240

----
This is an automatically sent email for github@datafusion.apache.org.
To unsubscribe, please send an email to: 
github-unsubscr...@datafusion.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to