This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fa570e8ea74ece419fb68aa9d123d3433f439d12
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Thu Jan 9 16:21:01 2025 +0100

    IMPALA-13655: UPDATE redundantly accumulates memory in HDFS WRITER
    
    When IcebergUpdateImpl created the table sink it didn't set
    'inputIsClustered' to true. Therefore HdfsTableSink expected
    random input and kept the output writers open for every partition,
    which resulted in high memory consumption and potentially an
    OOM error when the number of partitions are high.
    
    Since we actually sort the rows before the sink we can set
    'inputIsClustered' to true, which means HdfsTableSink can write
    files one by one, because whenever it gets a row that belongs
    to a new partition it knows that it can close the current output
    writer, and open a new one.
    
    Testing:
     - e2e regression test
    
    Change-Id: I9bad335cc946364fc612e8aaf90858eaabd7c4af
    Reviewed-on: http://gerrit.cloudera.org:8080/22325
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/analysis/IcebergUpdateImpl.java  |  2 +-
 .../QueryTest/iceberg-update-partitions.test       | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
index 093f624e5..d04eb2452 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
@@ -177,7 +177,7 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
 
     TableSink insertSink = TableSink.create(modifyStmt_.table_, 
TableSink.Op.INSERT,
         insertPartitionKeyExprs_, insertResultExprs_, Collections.emptyList(), 
false,
-        false, new Pair<>(sortColumns_, sortingOrder_), -1, null,
+        true, new Pair<>(sortColumns_, sortingOrder_), -1, null,
         modifyStmt_.maxTableSinks_);
     TableSink deleteSink = new IcebergBufferedDeleteSink(
         icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_, 
deleteTableId_);
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
index 5cd376882..857e17628 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
@@ -481,3 +481,25 @@ update ice_alltypes_part_v2 set p_date = 
add_months(p_date, i);
 ---- TYPES
 INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
 ====
+---- QUERY
+# Regression test for IMPALA-13655.
+# Updating a table with many partitions (with low mem_limit).
+# Then we also check the number of files to verify that
+# the inputs of the writers are actually sorted(/clustered).
+# The table contains 401 files before the update (1 per partition),
+# then the update writes a delete and a data file to each partition.
+CREATE TABLE ice_tpch_many_parts
+PARTITIONED BY SPEC(truncate(500, l_partkey))
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2')
+AS SELECT * FROM tpch_parquet.lineitem
+WHERE l_linenumber=1;
+
+SET MEM_LIMIT=470m;
+UPDATE ice_tpch_many_parts SET l_orderkey=l_orderkey+1;
+SELECT count(*) FROM $DATABASE.ice_tpch_many_parts.`files`;
+---- RESULTS
+1203
+---- TYPES
+BIGINT
+====
\ No newline at end of file

Reply via email to