GitHub user zheniasigayev added a comment to the discussion: Best practices for memory-efficient deduplication of pre-sorted Parquet files
Addressing Question 2) It's not possible to remove the `first_value()` aggregate from the above query since `col_7` and `col_8` won't appear in the `GROUP BY`. ``` Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "example.col_7" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6" appears in the SELECT clause satisfies this requirement ``` Instead, I removed `col_7` and `col_8` (the columns which `first_value()` aggregate is applied to). This is the resulting query: ```sql CREATE EXTERNAL TABLE example ( col_1 VARCHAR(50) NOT NULL, col_2 BIGINT NOT NULL, col_3 VARCHAR(50), col_4 VARCHAR(50), col_5 VARCHAR(50), col_6 VARCHAR(100) NOT NULL, col_7 VARCHAR(50), col_8 DOUBLE ) WITH ORDER (col_1 ASC, col_2 ASC) STORED AS PARQUET LOCATION '/tmp/redacted/*.parquet'; COPY ( SELECT col_1, col_2, col_3, col_4, col_5, col_6 FROM example GROUP BY col_1, col_2, col_3, col_4, col_5, col_6 ORDER BY col_1 ASC, col_2 ASC ) TO '/tmp/result_part2.parquet' STORED AS PARQUET OPTIONS (compression 'zstd(1)'); ``` The resulting `EXPLAIN` output: ``` +---------------+-------------------------------+ | plan_type | plan | +---------------+-------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ DataSinkExec │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ SortPreservingMergeExec │ | | | │ -------------------- │ | | | │ col_1 ASC NULLS LAST, │ | | | │ col_2 ASC NULLS LAST │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ SortExec │ | | | │ -------------------- │ | | | │ col_1@0 ASC NULLS LAST, │ | | | │ col_2@1 ASC NULLS LAST │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ AggregateExec │ | | | │ -------------------- │ | | | │ group_by: │ | | | │ col_1, col_2, col_3, col_4│ | | | │ , col_5, col_6 │ | | | │ │ | | | │ mode: │ | | | │ FinalPartitioned │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ CoalesceBatchesExec │ | | | │ -------------------- │ | | | │ target_batch_size: │ | | | │ 8192 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ RepartitionExec │ | | | │ -------------------- │ | | | │ partition_count(in->out): │ | | | │ 10 -> 10 │ | | | │ │ | | | │ partitioning_scheme: │ | | | │ Hash([col_1@0, col_2@1, │ | | | │ col_3@2, col_4@3, │ | | | │ col_5@4, col_6@5], │ | | | │ 10) │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ AggregateExec │ | | | │ -------------------- │ | | | │ group_by: │ | | | │ col_1, col_2, col_3, col_4│ | | | │ , col_5, col_6 │ | | | │ │ | | | │ mode: Partial │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ DataSourceExec │ | | | │ -------------------- │ | | | │ files: 24 │ | | | │ format: parquet │ | | | └───────────────────────────┘ | | | | +---------------+-------------------------------+ ``` Executing the query results in: ``` Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[6]#32(can spill: true) consumed 2.7 GB, ExternalSorter[1]#17(can spill: true) consumed 2.7 GB, GroupedHashAggregateStream[2] ()#19(can spill: true) consumed 307.0 MB, GroupedHashAggregateStream[4] ()#25(can spill: true) consumed 306.2 MB, GroupedHashAggregateStream[3] ()#22(can spill: true) consumed 305.2 MB, GroupedHashAggregateStream[8] ()#37(can spill: true) consumed 305.1 MB, GroupedHashAggregateStream[7] ()#34(can spill: true) consumed 304.6 MB, GroupedHashAggregateStream[9] ()#40(can spill: true) consumed 304.5 MB, GroupedHashAggregateStream[0] ()#13(can spill: true) consumed 304.4 MB, GroupedHashAggregateStream[5] ()#28(can spill: true) consumed 28.3 MB, GroupedHashAggregateStream[1] ()#16(can spill: true) consumed 28.3 MB, GroupedHashAggregateStream[6] ()#31(can spill: true) consumed 28.3 MB, ExternalSorterMerge[5]#30(can spill: false) consumed 10.0 MB, ExternalSorter[3]#23(can spill: true) consumed 0.0 B, RepartitionExec[2]#43(can spill: false) consumed 0.0 B, SortPreservingMergeExec[0]#2(can spill: false) consumed 0.0 B, ExternalSorterMerge[4]#27(can spill: false) consumed 0.0 B, ExternalSorterMerge[0]#15(can spill: false) consumed 0.0 B, ExternalSorterMerge[1]#18(can spill: false) consumed 0.0 B, ExternalSorterMerge[2]#21(can spill: false) consumed 0.0 B, RepartitionExec[3]#44(can spill: false) consumed 0.0 B, ExternalSorterMerge[8]#39(can spill: false) consumed 0.0 B, ExternalSorter[2]#20(can spill: true) consumed 0.0 B, ExternalSorter[8]#38(can spill: true) consumed 0.0 B, ExternalSorter[0]#14(can spill: true) consumed 0.0 B. Error: Failed to allocate additional 552.9 MB for ExternalSorter[5] with 0.0 B already allocated for this reservation - 412.8 MB remain available for the total pool ``` GitHub link: https://github.com/apache/datafusion/discussions/16776#discussioncomment-13780240 ---- This is an automatically sent email for github@datafusion.apache.org. To unsubscribe, please send an email to: github-unsubscr...@datafusion.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org