This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit fa570e8ea74ece419fb68aa9d123d3433f439d12 Author: Noemi Pap-Takacs <[email protected]> AuthorDate: Thu Jan 9 16:21:01 2025 +0100 IMPALA-13655: UPDATE redundantly accumulates memory in HDFS WRITER When IcebergUpdateImpl created the table sink it didn't set 'inputIsClustered' to true. Therefore HdfsTableSink expected random input and kept the output writers open for every partition, which resulted in high memory consumption and potentially an OOM error when the number of partitions are high. Since we actually sort the rows before the sink we can set 'inputIsClustered' to true, which means HdfsTableSink can write files one by one, because whenever it gets a row that belongs to a new partition it knows that it can close the current output writer, and open a new one. Testing: - e2e regression test Change-Id: I9bad335cc946364fc612e8aaf90858eaabd7c4af Reviewed-on: http://gerrit.cloudera.org:8080/22325 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/analysis/IcebergUpdateImpl.java | 2 +- .../QueryTest/iceberg-update-partitions.test | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java index 093f624e5..d04eb2452 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java @@ -177,7 +177,7 @@ public class IcebergUpdateImpl extends IcebergModifyImpl { TableSink insertSink = TableSink.create(modifyStmt_.table_, TableSink.Op.INSERT, insertPartitionKeyExprs_, insertResultExprs_, Collections.emptyList(), false, - false, new Pair<>(sortColumns_, sortingOrder_), -1, null, + true, new Pair<>(sortColumns_, sortingOrder_), -1, null, modifyStmt_.maxTableSinks_); TableSink deleteSink = new IcebergBufferedDeleteSink( icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_, deleteTableId_); diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test index 5cd376882..857e17628 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test @@ -481,3 +481,25 @@ update ice_alltypes_part_v2 set p_date = add_months(p_date, i); ---- TYPES INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING ==== +---- QUERY +# Regression test for IMPALA-13655. +# Updating a table with many partitions (with low mem_limit). +# Then we also check the number of files to verify that +# the inputs of the writers are actually sorted(/clustered). +# The table contains 401 files before the update (1 per partition), +# then the update writes a delete and a data file to each partition. +CREATE TABLE ice_tpch_many_parts +PARTITIONED BY SPEC(truncate(500, l_partkey)) +STORED BY ICEBERG +TBLPROPERTIES ('format-version'='2') +AS SELECT * FROM tpch_parquet.lineitem +WHERE l_linenumber=1; + +SET MEM_LIMIT=470m; +UPDATE ice_tpch_many_parts SET l_orderkey=l_orderkey+1; +SELECT count(*) FROM $DATABASE.ice_tpch_many_parts.`files`; +---- RESULTS +1203 +---- TYPES +BIGINT +==== \ No newline at end of file
