This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 55d7498b2 IMPALA-13656: MERGE redundantly accumulates memory in HDFS
WRITER
55d7498b2 is described below
commit 55d7498b2478f5988d53c2ec0bd1b282a8298fe1
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Mon Jan 13 14:57:11 2025 +0100
IMPALA-13656: MERGE redundantly accumulates memory in HDFS WRITER
When IcebergMergeImpl created the table sink it didn't set
'inputIsClustered' to true. Therefore HdfsTableSink expected
random input and kept the output writers open for every partition,
which resulted in high memory consumption and potentially a
Memory Limit Exceeded error when the number of partitions are high.
Since we actually sort the rows before the sink we can set
'inputIsClustered' to true, which means HdfsTableSink can write
files one by one, because whenever it gets a row that belongs
to a new partition it knows that it can close the current output
writer, and open a new one.
Testing:
- e2e regression test
Change-Id: I7bad0310e96eb482af9d09ba0d41e44c07bf8e4d
Reviewed-on: http://gerrit.cloudera.org:8080/22332
Reviewed-by: Peter Rozsa <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/analysis/IcebergMergeImpl.java | 2 +-
.../queries/QueryTest/iceberg-merge-partition.test | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
index 5956e59ec..7ee91178e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMergeImpl.java
@@ -279,7 +279,7 @@ public class IcebergMergeImpl implements MergeImpl {
public TableSink createInsertSink() {
return TableSink.create(icebergTable_, TableSink.Op.INSERT,
targetPartitionExpressions_, targetExpressions_,
Collections.emptyList(), false,
- false, targetSorting_.sortingColumnsAndOrder(), -1, null,
+ true, targetSorting_.sortingColumnsAndOrder(), -1, null,
mergeStmt_.maxTableSinks_);
}
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
index 70cb459e0..b3dcfa365 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-merge-partition.test
@@ -339,3 +339,26 @@ when matched and target.id between 2 and 3 then update set
string_col = "differe
---- TYPES
INT,BOOLEAN,INT,FLOAT,DECIMAL,DATE,STRING
====
+---- QUERY
+# Regression test for IMPALA-13656.
+# Merging a from a source table with many partitions (with low mem_limit).
+# Then we also check the number of files to verify that
+# the inputs of the writers are actually sorted(/clustered).
+CREATE TABLE ice_tpch_many_parts
+PARTITIONED BY SPEC(truncate(500, l_partkey))
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2')
+AS SELECT l_orderkey, l_partkey, l_linenumber, l_shipdate, l_comment
+FROM tpch_parquet.lineitem limit 1;
+
+SET MEM_LIMIT=400m;
+MERGE INTO ice_tpch_many_parts USING
+(SELECT l_orderkey, l_partkey, l_linenumber, l_shipdate, l_comment FROM
tpch_parquet.lineitem WHERE l_linenumber=1) l
+ON ice_tpch_many_parts.l_orderkey=l.l_orderkey
+WHEN NOT MATCHED THEN INSERT values(l.l_orderkey, l.l_partkey, l.l_linenumber,
l.l_shipdate, l.l_comment);
+SELECT count(*) FROM $DATABASE.ice_tpch_many_parts.`files`;
+---- RESULTS
+402
+---- TYPES
+BIGINT
+====