This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f51b018 IMPALA-12588: Don't UPDATE rows that already have the 
desired value
79f51b018 is described below

commit 79f51b018f374fb08c5c8e2a52251b6cbe14fd18
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Tue Jan 21 14:24:45 2025 +0100

    IMPALA-12588: Don't UPDATE rows that already have the desired value
    
    When UPDATEing an Iceberg or Kudu table, we should change as few rows
    as possible. In case of Iceberg tables it means writing as few new
    data records and delete records as possible.
    Therefore, if rows already have the new values we should just ignore
    them. One way to achieve this is to add extra predicates, e.g.:
    
      UPDATE tbl SET k = 3 WHERE i > 4;
        ==>
      UPDATE tbl SET k = 3 WHERE i > 4 AND k IS DISTINCT FROM 3;
    
    So we won't write new data/delete records for the rows that already have
    the desired value.
    
    Explanation on how to create extra predicates to filter out these rows:
    
    If there are multiple assignments in the SET list, we can only skip
    updating a row if all the mentioned values are already equal.
    If either of the values needs to be updated, the entire row does.
    Therefore we can think of the SET list as predicates connected with AND
    and all of them need to be taken into consideration.
    To negate this SET list, we have to negate the individual SET
    assignments and connect them with OR.
    Then add this new compound predicate to the original where predicates
    with an AND (if there were none, just create a WHERE predicate from it).
    
                    AND
                  /     \
          original        OR
     WHERE predicate    /    \
                      !a       OR
                             /    \
                           !b     !c
    
    This simple graph illustrates how the where predicate is rewritten.
    (Considering an UPDATE statement that sets 3 columns.)
    '!a', '!b' and '!c' are the negations of the individual assignments in
    the SET list. So the extended WHERE predicate is:
    (original WHERE predicate) AND (!a OR !b OR !c)
    To handle NULL values correctly, we use IS DISTINCT FROM instead of
    simply negating the assignment with operator '!='.
    
    If the assignments contain UDFs, the result might be inconsistent
    because of possible non-deterministic values or state in the UDFs,
    therefore we should not rewrite the WHERE predicate at all.
    
    Evaluating expressions can be expensive, therefore this optimization
    can be limited or switched off entirely using the Query Option
    SKIP_UNNEEDED_UPDATES_COL_LIMIT. By default, there is no filtering
    if more than 10 assignments are in the SET list.
    
    -------------------------------------------------------------------
    Some performance measurements on a tpch lineitem table:
    
    - predicates in HASH join, all updates can be skipped
    Q1/[Q2] (Similar, but Q2 adds extra 4 items to the SET list):
    update t set t.l_suppkey = s.l_suppkey,
    [ t.l_partkey=s.l_partkey,
      t.l_quantity=s.l_quantity,
      t.l_returnflag=s.l_returnflag,
      t.l_shipmode=s.l_shipmode ]
    from ice_lineitem t join ice_lineitem s
    on t.l_orderkey=s.l_orderkey and t.l_linenumber=s.l_linenumber;
    
    - predicates in HASH join, all rows need to be updated
    Q3: update t set
     t.l_suppkey = s.l_suppkey,
     t.l_partkey=s.l_partkey,
     t.l_quantity=s.l_quantity,
     t.l_returnflag=s.l_returnflag,
     t.l_shipmode=concat(s.l_shipmode,' ')
     from ice_lineitem t join ice_lineitem s
     on t.l_orderkey=s.l_orderkey and t.l_linenumber=s.l_linenumber;
    
    - predicates pushed down to the scanner, all rows updated
    Q4/[Q5] (Similar, but Q5 adds extra 8 items to the SET ist):
    update ice_lineitem set
    [ l_suppkey = l_suppkey + 0,
      l_partkey=l_partkey + 0,
      l_quantity=l_quantity,
      l_returnflag=l_returnflag,
      l_tax = l_tax,
      l_discount= l_discount,
      l_comment = l_comment,
      l_receiptdate = l_receiptdate, ]
    l_shipmode=concat(l_shipmode,' ');
    
    +=======+============+==========+======+
    | Query | unfiltered | filtered | diff |
    +=======+============+==========+======+
    | Q1    |       4.1s |     1.9s | -54% |
    +-------+------------+----------+------+
    | Q2    |       4.2s |     2.1s | -50% |
    +-------+------------+----------+------+
    | Q3    |       4.3s |     4.7s | +10% |
    +-------+------------+----------+------+
    | Q4    |       3.0s |     3.0s | +0%  |
    +-------+------------+----------+------+
    | Q5    |       3.1s |     3.1s | +0%  |
    +-------+------------+----------+------+
    
    The results show that in the best case (we can skip all rows)
    this change can cause significant perf improvement ~50%, since
    0 rows were written. See Q1 and Q2.
    If the predicates are evaluated in the join operator, but there were
    no matches (worst case scenario) we can lose about 10%. (Q3)
    If all the predicates can be pushed down to the scanners, the change
    does not seem to cause significant difference (~0% in Q4 and Q5)
    even if all rows have to be updated.
    
    Testing:
     - Analysis
     - Planner
     - E2E
     - Kudu
     - Iceberg
     - testing the new query option: SKIP_UNNEEDED_UPDATES_COL_LIMIT
    Change-Id: I926c80e8110de5a4615a3624a81a330f54317c8b
    Reviewed-on: http://gerrit.cloudera.org:8080/22407
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/query-options.cc                    |   6 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/ImpalaService.thrift                 |   7 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/analysis/ModifyImpl.java     |   4 +
 .../org/apache/impala/analysis/ModifyStmt.java     |   5 +-
 .../org/apache/impala/analysis/UpdateStmt.java     |  61 ++++++-
 .../apache/impala/analysis/ExprRewriterTest.java   |   7 +-
 .../org/apache/impala/planner/ColumnsTest.java     |   3 +-
 .../queries/PlannerTest/iceberg-v2-update.test     | 177 ++++++++++++++++-----
 .../PlannerTest/kudu-dml-with-utc-conversion.test  |   1 +
 .../queries/PlannerTest/kudu-update.test           |  12 ++
 .../queries/QueryTest/iceberg-update-basic.test    | 137 +++++++++++++++-
 .../queries/QueryTest/kudu_update.test             |  24 ++-
 tests/query_test/test_iceberg.py                   |  14 +-
 15 files changed, 408 insertions(+), 57 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5a58a09cf..834b87765 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1345,6 +1345,12 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type 
option, const string& va
         query_options->__set_sync_hms_events_strict_mode(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::SKIP_UNNEEDED_UPDATES_COL_LIMIT: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, 
&int32_t_val));
+        query_options->__set_skip_unneeded_updates_col_limit(int32_t_val);
+        break;
+      }
       default:
         string key = to_string(option);
         if (IsRemovedQueryOption(key)) {
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index acd0e213f..77c969822 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -51,7 +51,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // plus one. Thus, the second argument to the DCHECK has to be updated every
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 constexpr unsigned NUM_QUERY_OPTIONS =
-    TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE + 1;
+    TImpalaQueryOptions::SKIP_UNNEEDED_UPDATES_COL_LIMIT + 1;
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS);   
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -370,6 +370,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
       TQueryOptionLevel::ADVANCED)                                             
          \
   QUERY_OPT_FN(sync_hms_events_strict_mode, SYNC_HMS_EVENTS_STRICT_MODE,       
          \
       TQueryOptionLevel::ADVANCED)                                             
          \
+  QUERY_OPT_FN(skip_unneeded_updates_col_limit,                                
          \
+      SKIP_UNNEEDED_UPDATES_COL_LIMIT, TQueryOptionLevel::ADVANCED)            
          \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index f4ac57a95..41a602f2b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -998,6 +998,13 @@ enum TImpalaQueryOptions {
   // SYNC_HMS_EVENTS_WAIT_TIME_S seconds. Defaults to false, which means 
coordinator will
   // start query planning regardless of the failure.
   SYNC_HMS_EVENTS_STRICT_MODE = 188
+
+  // Turns on an optimization in UPDATE operation that skips updating rows 
which already
+  // have the desired value. The filtering is on by default, skipping 
unnecessary updates
+  // if there are up to 10 items in the SET list. If more columns are updated 
than the
+  // given value, then no filtering is done.
+  // Set it non-positive to turn this feature off completely.
+  SKIP_UNNEEDED_UPDATES_COL_LIMIT = 189
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index ccbff8f99..bda38fa9c 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -768,6 +768,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   189: optional bool sync_hms_events_strict_mode = false
+
+  // See comment in ImpalaService.thrift
+  190: optional i32 skip_unneeded_updates_col_limit = 10
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
index 8098ee20b..8cbc7f793 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -101,6 +101,10 @@ abstract class ModifyImpl {
       // Builds the select list and column position mapping for the target 
table.
       ArrayList<SelectListItem> selectList = new ArrayList<>();
       buildAndValidateSelectExprs(analyzer, selectList);
+      // Filter out rows that would be unnecessary to update.
+      if (modifyStmt_ instanceof UpdateStmt) {
+        ((UpdateStmt) modifyStmt_).rewriteWherePredicate(analyzer);
+      }
 
       // Analyze the generated select statement.
       sourceStmt_ = new SelectStmt(new SelectList(selectList), 
modifyStmt_.fromClause_,
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 307c1ac76..167350b70 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -64,7 +64,7 @@ public abstract class ModifyStmt extends DmlStatementBase {
   protected final List<Pair<SlotRef, Expr>> assignments_;
 
   // Optional WHERE clause of the statement
-  protected final Expr wherePredicate_;
+  protected Expr wherePredicate_;
 
   // Path identifying the target table.
   protected final List<String> targetTablePath_;
@@ -166,14 +166,13 @@ public abstract class ModifyStmt extends DmlStatementBase 
{
     table_ = dstTbl;
     if (modifyImpl_ == null) createModifyImpl();
     modifyImpl_.analyze(analyzer);
+    sqlString_ = toSql();
     // Create and analyze the source statement.
     modifyImpl_.createSourceStmt(analyzer);
     // Add target table to descriptor table.
     analyzer.getDescTbl().setTargetTable(table_);
 
     analyzer_.addWhereColumns(wherePredicate_);
-
-    sqlString_ = toSql();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 3f22d17dc..fd5349467 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 
@@ -42,12 +43,13 @@ import com.google.common.base.Preconditions;
  *
  * An update statement consists of four major parts. First, the target table 
path,
  * second, the list of assignments, the optional FROM clause, and the optional 
where
- * clause. The type of the right-hand side of each assignments must be
+ * clause. The type of the right-hand side of each assignment must be
  * assignment compatible with the left-hand side column type.
  *
  * Currently, only Kudu and Iceberg tables can be updated.
  */
 public class UpdateStmt extends ModifyStmt {
+
   public UpdateStmt(List<String> targetTablePath, FromClause tableRefs,
       List<Pair<SlotRef, Expr>> assignmentExprs, Expr wherePredicate) {
     super(targetTablePath, tableRefs, assignmentExprs, wherePredicate);
@@ -123,4 +125,61 @@ public class UpdateStmt extends ModifyStmt {
     }
     return b.toString();
   }
+
+  // Rewrite or create WHERE predicate to filter out rows that already have 
the desired
+  // value, thus skipping unnecessary updates.
+  protected void rewriteWherePredicate(Analyzer analyzer) {
+    // If there are too many columns to update, the expression tree might grow 
too large
+    // and the cost of evaluating all the extra expressions might not be worth 
it.
+    // Therefore we can limit or switch off the optimization using the Query 
Option
+    // SKIP_UNNEEDED_UPDATES_COL_LIMIT.
+    int col_limit = analyzer.getQueryOptions().skip_unneeded_updates_col_limit;
+    if (assignments_.size() > col_limit) {
+      return;
+    }
+    // Form predicates ('A', 'B', 'C') to check that the two sides of the 
assignment(s)
+    // are distinct. (e.g. 'A': col_a IS DISTINCT FROM new_value_a)
+    // If there are multiple assignments in the SET list, connect these 
predicates with OR
+    // (if at least one value needs to be changed, the entire row needs to be 
updated).
+    // Then create or add them to the existing wherePredicate_ list with an 
AND.
+    //                 AND
+    //               /     \
+    //        original        OR
+    // WHERE predicates     /    \
+    //                     A       OR
+    //                           /    \
+    //                          B      C
+    // Result: (original where predicates) AND (A OR B OR C)
+    Predicate pred = negateAssignment(assignments_.get(0));
+    // In case of UDFs in the SET list, keep the original WHERE predicate.
+    if (pred == null) {
+      return;
+    }
+
+    for (int i = 1; i < assignments_.size(); i++) {
+      Predicate next = negateAssignment(assignments_.get(i));
+      if (next == null) {
+        return;
+      }
+      pred = new CompoundPredicate(CompoundPredicate.Operator.OR, pred, next);
+    }
+    if (wherePredicate_ != null) {
+      wherePredicate_ = new CompoundPredicate(CompoundPredicate.Operator.AND,
+          wherePredicate_, pred);
+    } else {
+      wherePredicate_ = pred;
+    }
+  }
+
+  // Create an extra predicate to filter out rows that already have the value 
we want to
+  // SET. We need to check whether the two sides of the assignment are 
distinct or not.
+  private Predicate negateAssignment(Pair<SlotRef, Expr> assignment) {
+    // Do not add UDFs to the predicate, because they could be evaluated 
differently in
+    // the WHERE predicate than the SET list causing inconsistent results.
+    if (assignment.second.contains(Expr.IS_UDF_PREDICATE)) {
+      return null;
+    }
+    return new BinaryPredicate(BinaryPredicate.Operator.DISTINCT_FROM,
+            assignment.first.clone(), assignment.second.clone());
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriterTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ExprRewriterTest.java
index 4c9457a39..56e16593d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriterTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriterTest.java
@@ -189,9 +189,9 @@ public class ExprRewriterTest extends AnalyzerTest {
     // Update.
     RewritesOk("update t2 set name = 'test' from " +
         "functional.alltypes t1 join functional_kudu.dimtbl t2 on (t1.id = 
t2.id) " +
-        "where t2.id < 10", 10, 5);
+        "where t2.id < 10", 14, 5);
     RewritesOk("update functional_kudu.dimtbl set name = 'test', zip = 4711 " +
-        "where exists (" + stmt_ + ")", 28, 16);
+        "where exists (" + stmt_ + ")", 35, 17);
     // Delete.
     RewritesOk("delete a from " +
         "functional_kudu.testtbl a join functional.testtbl b on a.zip = 
b.zip", 4, 2);
@@ -359,7 +359,8 @@ public class ExprRewriterTest extends AnalyzerTest {
             + "FROM functional_kudu.alltypes WHERE id = (SELECT 1 + 1)",
         "UPDATE functional_kudu.alltypes SET string_col = 'test' "
             + "FROM functional_kudu.alltypes LEFT SEMI JOIN (SELECT 2) `$a$1` 
(`$c$1`) "
-            + "ON id = `$a$1`.`$c$1` WHERE id = (SELECT 2)");
+            + "ON id = `$a$1`.`$c$1` "
+            + "WHERE id = (SELECT 2) AND string_col IS DISTINCT FROM 'test'");
 
     // Delete
     assertToSql(ctx,
diff --git a/fe/src/test/java/org/apache/impala/planner/ColumnsTest.java 
b/fe/src/test/java/org/apache/impala/planner/ColumnsTest.java
index 2063a2736..8bef90341 100644
--- a/fe/src/test/java/org/apache/impala/planner/ColumnsTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/ColumnsTest.java
@@ -274,7 +274,8 @@ public class ColumnsTest extends FrontendTestBase {
             "functional_parquet.iceberg_v2_delete_positional.file__position",
             "functional_parquet.iceberg_v2_delete_positional.id",
             
"functional_parquet.iceberg_v2_delete_positional.input__file__name"),
-        ImmutableList.of("functional_parquet.iceberg_v2_delete_positional.id"),
+        
ImmutableList.of("functional_parquet.iceberg_v2_delete_positional.data",
+            "functional_parquet.iceberg_v2_delete_positional.id"),
         ImmutableList.of(
             
"functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01."
                 + "file_path",
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
index 57c27f3ef..49257574e 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
@@ -6,7 +6,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
-   predicates: i = 3
+   predicates: i = 3, s IS DISTINCT FROM concat(s, s)
    Iceberg snapshot id: 728158873687794725
    row-size=36B cardinality=1
 ---- DISTRIBUTEDPLAN
@@ -16,7 +16,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
-   predicates: i = 3
+   predicates: i = 3, s IS DISTINCT FROM concat(s, s)
    Iceberg snapshot id: 728158873687794725
    row-size=36B cardinality=1
 ====
@@ -36,7 +36,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
-   predicates: id = 15
+   predicates: id = 15, `data` IS DISTINCT FROM concat(`data`, 'a')
    Iceberg snapshot id: 5725822353600261755
    row-size=40B cardinality=1
 ---- DISTRIBUTEDPLAN
@@ -56,7 +56,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
-   predicates: id = 15
+   predicates: id = 15, `data` IS DISTINCT FROM concat(`data`, 'a')
    Iceberg snapshot id: 5725822353600261755
    row-size=40B cardinality=1
 ====
@@ -67,7 +67,7 @@ MULTI DATA SINK
 |->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
-|  row-size=40B cardinality=2
+|  row-size=40B cardinality=1
 |
 |--01:SCAN HDFS 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 
functional_parquet.iceberg_v2_delete_positional-position-delete]
 |     HDFS partitions=1/1 files=1 size=1.54KB
@@ -76,15 +76,16 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
+   predicates: id IS DISTINCT FROM CAST(id + 1 AS INT)
    Iceberg snapshot id: 5725822353600261755
-   row-size=40B cardinality=3
+   row-size=40B cardinality=1
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, 
OVERWRITE=false]
 |->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
-|  row-size=40B cardinality=2
+|  row-size=40B cardinality=1
 |
 |--03:EXCHANGE [DIRECTED]
 |  |
@@ -95,8 +96,9 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
+   predicates: id IS DISTINCT FROM CAST(id + 1 AS INT)
    Iceberg snapshot id: 5725822353600261755
-   row-size=40B cardinality=3
+   row-size=40B cardinality=1
 ====
 UPDATE iceberg_v2_delete_positional SET id = 42 WHERE FILE__POSITION = id
 ---- PLAN
@@ -114,7 +116,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
-   predicates: FILE__POSITION = id
+   predicates: FILE__POSITION = id, 
functional_parquet.iceberg_v2_delete_positional.file__position IS DISTINCT FROM 
42, id IS DISTINCT FROM 42
    Iceberg snapshot id: 5725822353600261755
    row-size=40B cardinality=1
 ---- DISTRIBUTEDPLAN
@@ -134,7 +136,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
    HDFS partitions=1/1 files=1 size=662B
-   predicates: FILE__POSITION = id
+   predicates: FILE__POSITION = id, 
functional_parquet.iceberg_v2_delete_positional.file__position IS DISTINCT FROM 
42, id IS DISTINCT FROM 42
    Iceberg snapshot id: 5725822353600261755
    row-size=40B cardinality=1
 ====
@@ -146,10 +148,10 @@ MULTI DATA SINK
 |
 03:SORT
 |  order by: action ASC NULLS LAST
-|  row-size=76B cardinality=10
+|  row-size=76B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
-|  row-size=76B cardinality=10
+|  row-size=80B cardinality=1
 |
 |--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
@@ -158,8 +160,9 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
+   predicates: id IS DISTINCT FROM length(action)
    Iceberg snapshot id: 8885697082976537578
-   row-size=76B cardinality=20
+   row-size=80B cardinality=2
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
@@ -167,12 +170,12 @@ MULTI DATA SINK
 |
 05:SORT
 |  order by: action ASC NULLS LAST
-|  row-size=76B cardinality=10
+|  row-size=76B cardinality=1
 |
 04:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.action)]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
-|  row-size=76B cardinality=10
+|  row-size=80B cardinality=1
 |
 |--03:EXCHANGE [DIRECTED]
 |  |
@@ -183,8 +186,59 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
+   predicates: id IS DISTINCT FROM length(action)
    Iceberg snapshot id: 8885697082976537578
-   row-size=76B cardinality=20
+   row-size=80B cardinality=2
+====
+UPDATE iceberg_v2_partitioned_position_deletes set id = length(action) where 
user like "impala"
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+03:SORT
+|  order by: action ASC NULLS LAST
+|  row-size=76B cardinality=1
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=80B cardinality=1
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     Iceberg snapshot id: 8885697082976537578
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   predicates: id IS DISTINCT FROM length(action), `user` LIKE 'impala'
+   Iceberg snapshot id: 8885697082976537578
+   row-size=80B cardinality=2
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+05:SORT
+|  order by: action ASC NULLS LAST
+|  row-size=76B cardinality=1
+|
+04:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.action)]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=80B cardinality=1
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     Iceberg snapshot id: 8885697082976537578
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   predicates: id IS DISTINCT FROM length(action), `user` LIKE 'impala'
+   Iceberg snapshot id: 8885697082976537578
+   row-size=80B cardinality=2
 ====
 UPDATE iceberg_v2_partitioned_position_deletes set id = length(action) where 
action = 'click'
 ---- PLAN
@@ -194,10 +248,10 @@ MULTI DATA SINK
 |
 03:SORT
 |  order by: action ASC NULLS LAST
-|  row-size=76B cardinality=3
+|  row-size=76B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
-|  row-size=76B cardinality=3
+|  row-size=80B cardinality=1
 |
 |--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=1 size=3.15KB
@@ -206,9 +260,10 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.15KB
+   predicates: id IS DISTINCT FROM 5
    Iceberg snapshot id: 8885697082976537578
    skipped Iceberg predicates: action = 'click'
-   row-size=76B cardinality=6
+   row-size=80B cardinality=1
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
@@ -216,10 +271,10 @@ MULTI DATA SINK
 |
 04:SORT
 |  order by: action ASC NULLS LAST
-|  row-size=76B cardinality=3
+|  row-size=76B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
-|  row-size=76B cardinality=3
+|  row-size=80B cardinality=1
 |
 |--03:EXCHANGE [DIRECTED]
 |  |
@@ -230,9 +285,10 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.15KB
+   predicates: id IS DISTINCT FROM 5
    Iceberg snapshot id: 8885697082976537578
    skipped Iceberg predicates: action = 'click'
-   row-size=76B cardinality=6
+   row-size=80B cardinality=1
 ====
 UPDATE target set user = s from iceberg_v2_partitioned_position_deletes 
target, iceberg_v2_positional_update_all_rows source where target.id = source.i
 ---- PLAN
@@ -246,8 +302,9 @@ MULTI DATA SINK
 |
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: target.id = source.i
+|  other predicates: `user` IS DISTINCT FROM s
 |  runtime filters: RF000 <- source.i
-|  row-size=104B cardinality=10
+|  row-size=116B cardinality=10
 |
 |--07:UNION
 |  |  pass-through-operands: all
@@ -272,7 +329,7 @@ MULTI DATA SINK
 |     row-size=36B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
-|  row-size=68B cardinality=10
+|  row-size=80B cardinality=10
 |
 |--01:SCAN HDFS 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 
target-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
@@ -283,7 +340,7 @@ MULTI DATA SINK
    HDFS partitions=1/1 files=3 size=3.48KB
    runtime filters: RF000 -> target.id
    Iceberg snapshot id: 8885697082976537578
-   row-size=68B cardinality=20
+   row-size=80B cardinality=20
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, 
OVERWRITE=false, PARTITION-KEYS=(action)]
@@ -297,8 +354,9 @@ MULTI DATA SINK
 |
 08:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: target.id = source.i
+|  other predicates: `user` IS DISTINCT FROM s
 |  runtime filters: RF000 <- source.i
-|  row-size=104B cardinality=10
+|  row-size=116B cardinality=10
 |
 |--11:EXCHANGE [BROADCAST]
 |  |
@@ -327,7 +385,7 @@ MULTI DATA SINK
 |     row-size=36B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
-|  row-size=68B cardinality=10
+|  row-size=80B cardinality=10
 |
 |--09:EXCHANGE [DIRECTED]
 |  |
@@ -340,7 +398,7 @@ MULTI DATA SINK
    HDFS partitions=1/1 files=3 size=3.48KB
    runtime filters: RF000 -> target.id
    Iceberg snapshot id: 8885697082976537578
-   row-size=68B cardinality=20
+   row-size=80B cardinality=20
 ====
 update iceberg_partition_transforms_zorder set ts = days_add(ts, 10), i = 
cast(i + 1000 as int)
 ---- PLAN
@@ -354,6 +412,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
    HDFS partitions=1/1 files=1 size=1.08KB
+   predicates: ts IS DISTINCT FROM days_add(ts, 10) OR i IS DISTINCT FROM 
CAST(i + 1000 AS INT)
    Iceberg snapshot id: 7350750578864730166
    row-size=72B cardinality=1
 ---- DISTRIBUTEDPLAN
@@ -367,6 +426,7 @@ MULTI DATA SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
    HDFS partitions=1/1 files=1 size=1.08KB
+   predicates: ts IS DISTINCT FROM days_add(ts, 10) OR i IS DISTINCT FROM 
CAST(i + 1000 AS INT)
    Iceberg snapshot id: 7350750578864730166
    row-size=72B cardinality=1
 ====
@@ -384,13 +444,14 @@ MULTI DATA SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: source.id = ice_zorder.i
+|  other predicates: j IS DISTINCT FROM length(action)
 |  runtime filters: RF000 <- ice_zorder.i
-|  row-size=84B cardinality=20
+|  row-size=88B cardinality=20
 |
 |--00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder 
ice_zorder]
 |     HDFS partitions=1/1 files=1 size=1.08KB
 |     Iceberg snapshot id: 7350750578864730166
-|     row-size=68B cardinality=1
+|     row-size=72B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
    HDFS partitions=1/1 files=20 size=22.90KB
@@ -410,15 +471,16 @@ MULTI DATA SINK
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: source.id = ice_zorder.i
+|  other predicates: j IS DISTINCT FROM length(action)
 |  runtime filters: RF000 <- ice_zorder.i
-|  row-size=84B cardinality=20
+|  row-size=88B cardinality=20
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder 
ice_zorder]
 |     HDFS partitions=1/1 files=1 size=1.08KB
 |     Iceberg snapshot id: 7350750578864730166
-|     row-size=68B cardinality=1
+|     row-size=72B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
    HDFS partitions=1/1 files=20 size=22.90KB
@@ -440,13 +502,14 @@ MULTI DATA SINK
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: source.id = ice_zorder.i
+|  other predicates: j IS DISTINCT FROM length(action)
 |  runtime filters: RF000 <- ice_zorder.i
-|  row-size=84B cardinality=20
+|  row-size=88B cardinality=20
 |
 |--00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder 
ice_zorder]
 |     HDFS partitions=1/1 files=1 size=1.08KB
 |     Iceberg snapshot id: 7350750578864730166
-|     row-size=68B cardinality=1
+|     row-size=72B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
    HDFS partitions=1/1 files=20 size=22.90KB
@@ -466,15 +529,16 @@ MULTI DATA SINK
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: source.id = ice_zorder.i
+|  other predicates: j IS DISTINCT FROM length(action)
 |  runtime filters: RF000 <- ice_zorder.i
-|  row-size=84B cardinality=20
+|  row-size=88B cardinality=20
 |
 |--03:EXCHANGE [BROADCAST]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder 
ice_zorder]
 |     HDFS partitions=1/1 files=1 size=1.08KB
 |     Iceberg snapshot id: 7350750578864730166
-|     row-size=68B cardinality=1
+|     row-size=72B cardinality=1
 |
 01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
    HDFS partitions=1/1 files=20 size=22.90KB
@@ -482,3 +546,44 @@ MULTI DATA SINK
    Iceberg snapshot id: 8270633197658268308
    row-size=16B cardinality=20
 ====
+# If there are too many items in the SET list, do not create extra filtering 
predicates.
+update iceberg_lineitem_multiblock
+set l_orderkey=1, l_partkey=1, l_suppkey=1, l_linenumber=1, l_tax=NULL, 
l_comment='',
+l_shipmode='', l_shipdate=NULL, l_commitdate=NULL, l_returnflag=NULL, 
l_discount=NULL
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_lineitem_multiblock, 
OVERWRITE=false]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_lineitem_multiblock-POSITION-DELETE]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=72B cardinality=17.89K
+|
+|--01:SCAN HDFS 
[functional_parquet.iceberg_lineitem_multiblock-POSITION-DELETE-01 
functional_parquet.iceberg_lineitem_multiblock-position-delete]
+|     HDFS partitions=1/1 files=1 size=13.29KB
+|     Iceberg snapshot id: 4821756033809199889
+|     row-size=238B cardinality=2.11K
+|
+00:SCAN HDFS [functional_parquet.iceberg_lineitem_multiblock]
+   HDFS partitions=1/1 files=1 size=1.73MB
+   Iceberg snapshot id: 4821756033809199889
+   row-size=72B cardinality=20.00K
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_lineitem_multiblock, 
OVERWRITE=false]
+|->BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_lineitem_multiblock-POSITION-DELETE]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=72B cardinality=17.89K
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS 
[functional_parquet.iceberg_lineitem_multiblock-POSITION-DELETE-01 
functional_parquet.iceberg_lineitem_multiblock-position-delete]
+|     HDFS partitions=1/1 files=1 size=13.29KB
+|     Iceberg snapshot id: 4821756033809199889
+|     row-size=238B cardinality=2.11K
+|
+00:SCAN HDFS [functional_parquet.iceberg_lineitem_multiblock]
+   HDFS partitions=1/1 files=1 size=1.73MB
+   Iceberg snapshot id: 4821756033809199889
+   row-size=72B cardinality=20.00K
+====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
index 55f196125..6e0a5067a 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-dml-with-utc-conversion.test
@@ -92,6 +92,7 @@ Per-Host Resources: mem-estimate=20.75MB mem-reservation=0B 
thread-reservation=2
   |  mem-estimate=20.00MB mem-reservation=0B thread-reservation=0
   |
   00:SCAN KUDU [functional_kudu.timestamp_primary_key]
+     predicates: t IS DISTINCT FROM TIMESTAMP '1970-01-01 00:00:00'
      kudu predicates: tkey = TIMESTAMP '1970-01-01 00:00:00'
      mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
      tuple-ids=0 row-size=16B cardinality=1
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
index 7d2f845a9..deb945921 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-update.test
@@ -3,12 +3,14 @@ update functional_kudu.testtbl set name = 'peter' where zip > 
94549
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: name IS DISTINCT FROM 'peter'
    kudu predicates: zip > 94549
    row-size=8B cardinality=0
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: name IS DISTINCT FROM 'peter'
    kudu predicates: zip > 94549
    row-size=8B cardinality=0
 ====
@@ -18,12 +20,14 @@ update functional_kudu.testtbl set name = 'peter' where zip 
> 94549 and id = 5
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: name IS DISTINCT FROM 'peter'
    kudu predicates: id = 5, zip > 94549
    row-size=8B cardinality=0
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: name IS DISTINCT FROM 'peter'
    kudu predicates: id = 5, zip > 94549
    row-size=8B cardinality=0
 ====
@@ -33,12 +37,14 @@ update functional_kudu.testtbl set zip = 94546 where zip > 
94549
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: zip IS DISTINCT FROM 94546
    kudu predicates: zip > 94549
    row-size=8B cardinality=0
 ---- DISTRIBUTEDPLAN
 UPDATE KUDU [functional_kudu.testtbl]
 |
 00:SCAN KUDU [functional_kudu.testtbl]
+   predicates: zip IS DISTINCT FROM 94546
    kudu predicates: zip > 94549
    row-size=8B cardinality=0
 ====
@@ -51,6 +57,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
+|  other predicates: a.name IS DISTINCT FROM b.name
 |  runtime filters: RF000 <- b.id, RF001 <- b.id
 |  row-size=28B cardinality=0
 |
@@ -68,6 +75,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |
 02:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: a.id = b.id
+|  other predicates: a.name IS DISTINCT FROM b.name
 |  runtime filters: RF000 <- b.id, RF001 <- b.id
 |  row-size=28B cardinality=0
 |
@@ -94,6 +102,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |  row-size=9B cardinality=1
 |
 |--00:SCAN KUDU [functional_kudu.testtbl a]
+|     predicates: a.name IS DISTINCT FROM 'values'
 |     row-size=8B cardinality=0
 |
 01:UNION
@@ -116,6 +125,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 03:EXCHANGE [HASH(a.id)]
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
+   predicates: a.name IS DISTINCT FROM 'values'
    runtime filters: RF000 -> a.id, RF001 -> a.id
    row-size=8B cardinality=0
 ====
@@ -137,6 +147,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |     row-size=4B cardinality=0
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
+   predicates: a.name IS DISTINCT FROM 'values'
    runtime filters: RF000 -> a.zip, RF001 -> a.zip
    row-size=12B cardinality=0
 ---- DISTRIBUTEDPLAN
@@ -158,6 +169,7 @@ UPDATE KUDU [functional_kudu.testtbl]
 |     row-size=4B cardinality=0
 |
 00:SCAN KUDU [functional_kudu.testtbl a]
+   predicates: a.name IS DISTINCT FROM 'values'
    runtime filters: RF000 -> a.zip, RF001 -> a.zip
    row-size=12B cardinality=0
 ====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
index 2bcf91cd8..fd91279ce 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
@@ -163,9 +163,9 @@ NumModifiedRows: 1
 NumDeletedRows: 1
 ====
 ---- QUERY
-update ice_alltypes set bigint_col = 33, int_col = 3, string_col = 'three';
+update ice_alltypes set bigint_col = 33, int_col = 30, string_col = 'three';
 ---- DML_RESULTS: ice_alltypes
-true,3,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
+true,30,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
 ---- TYPES
 BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
 ---- RUNTIME_PROFILE
@@ -173,6 +173,18 @@ NumModifiedRows: 1
 NumDeletedRows: 1
 ====
 ---- QUERY
+# The row already has the desired values, no need to update. (IMPALA-12588)
+update ice_alltypes set bigint_col = 33, int_col = 30, string_col = 'three';
+---- DML_RESULTS: ice_alltypes
+true,30,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumDeletedRows: 0
+====
+---- QUERY
+# Only one field changes.
 update ice_alltypes set bigint_col = 33, int_col = 3, string_col = 'three';
 ---- DML_RESULTS: ice_alltypes
 true,3,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
@@ -210,12 +222,61 @@ NumDeletedRows: 2
 ====
 ---- QUERY
 # If the JOIN in UPDATE has multiple matches Impala should raise an error.
+# Changing bigint_col to actually have rows to update.
 insert into ref_table values (0, 1111, 'IIMMPPAALLAA', '2023-12-01');
-update ice_alltypes set bigint_col=bi, string_col=s, date_col=d from 
ice_alltypes, ref_table where int_col = i;
+update ice_alltypes set bigint_col=bi+1, string_col=s, date_col=d from 
ice_alltypes, ref_table where int_col = i;
 ---- CATCH
 Duplicated row in DELETE sink.
 ====
 ---- QUERY
+# Check that NULL values and expressions evaluating to NULL in assignments are 
handled correctly.
+update ice_alltypes set dec_10_0_col = NULL, dec_8_2_col = cast(123.123 + NULL 
as decimal(8, 2));
+---- DML_RESULTS: ice_alltypes
+false,0,111,0,0,NULL,NULL,2023-11-07,2000-01-01 00:00:00,'IMPALA','zerob'
+true,3,222,1,1,NULL,NULL,2023-11-08,2001-01-01 01:01:01,'ICEBERG','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumDeletedRows: 2
+====
+---- QUERY
+# Check that NULL assignments and WHERE condition filter rows correctly.
+update ice_alltypes set dec_10_0_col = cast(123.123 as decimal(10, 0)), 
dec_8_2_col = NULL where int_col < 2;
+---- DML_RESULTS: ice_alltypes
+false,0,111,0,0,123,NULL,2023-11-07,2000-01-01 00:00:00,'IMPALA','zerob'
+true,3,222,1,1,NULL,NULL,2023-11-08,2001-01-01 01:01:01,'ICEBERG','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+# Swap two columns
+update ice_alltypes set dec_10_0_col = cast(dec_8_2_col as decimal(10, 0)), 
dec_8_2_col = cast(dec_10_0_col as decimal(8, 2));
+---- DML_RESULTS: ice_alltypes
+false,0,111,0,0,NULL,123.00,2023-11-07,2000-01-01 00:00:00,'IMPALA','zerob'
+true,3,222,1,1,NULL,NULL,2023-11-08,2001-01-01 01:01:01,'ICEBERG','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+# Check col_a = col_b + col_c
+update ice_alltypes set dec_10_0_col = cast(bigint_col + dec_8_2_col as 
decimal(10, 0));
+---- DML_RESULTS: ice_alltypes
+false,0,111,0,0,234,123.00,2023-11-07,2000-01-01 00:00:00,'IMPALA','zerob'
+true,3,222,1,1,NULL,NULL,2023-11-08,2001-01-01 01:01:01,'ICEBERG','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
 create table ice_id_partitioned (i int, p int, s string)
 partitioned by spec(p)
 stored by iceberg
@@ -266,6 +327,74 @@ FROM ice_id_partitioned JOIN ref_view ON (i = ri);
 INT,INT,STRING
 ====
 ---- QUERY
+# Update partitioned Iceberg table based on VIEW
+UPDATE ice_id_partitioned SET s = upper(rs)
+FROM ice_id_partitioned JOIN ref_view ON (i = ri)
+WHERE p < 1 AND rs = (select min(rs) from ref_view);
+---- DML_RESULTS: ice_id_partitioned
+1,0,'APACHE IMPALA'
+2,0,'iceberg'
+3,0,'hive'
+4,1,'Apache Spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+# Test if changing SKIP_UNNEEDED_UPDATES_COL_LIMIT query option takes effect: 
the 4th row
+# is updated even though the table content remained the same.
+SET SKIP_UNNEEDED_UPDATES_COL_LIMIT = 1;
+UPDATE ice_id_partitioned SET i=ri, p=1
+FROM ice_id_partitioned JOIN ref_table_2 ON (i = ri);
+---- DML_RESULTS: ice_id_partitioned
+1,0,'APACHE IMPALA'
+2,0,'iceberg'
+3,0,'hive'
+4,1,'Apache Spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+# Setting SKIP_UNNEEDED_UPDATES_COL_LIMIT back to default. No rows should be 
updated.
+SET SKIP_UNNEEDED_UPDATES_COL_LIMIT = 10;
+UPDATE ice_id_partitioned SET i=ri, p=1
+FROM ice_id_partitioned JOIN ref_table_2 ON (i = ri);
+---- DML_RESULTS: ice_id_partitioned
+1,0,'APACHE IMPALA'
+2,0,'iceberg'
+3,0,'hive'
+4,1,'Apache Spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsInserted): 0
+====
+---- QUERY
+# Update partitioned Iceberg table with inline view in FROM clause
+UPDATE ice_id_partitioned SET i=cast(a.max_val*(p+1) as int)
+FROM ice_id_partitioned JOIN ref_view ON (i = ri), (SELECT max(i) max_val FROM 
ice_id_partitioned) a
+WHERE p = 1;
+---- DML_RESULTS: ice_id_partitioned
+1,0,'APACHE IMPALA'
+2,0,'iceberg'
+3,0,'hive'
+10,1,'Apache Spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
 # Negative test for UPDATE part 3:
 # updating partition column AND right side is non-constant value AND we have a 
FROM clause with multiple table refs.
 # For such operations, if there are multiple matches in the JOIN, the 
duplicated records can
@@ -412,6 +541,8 @@ STRING, STRING, STRING, STRING
 # query is closed, when releasing resources. This test is a valid regression 
test because
 # even if it passes, there will be a crash: some later queries are likely to 
fail and
 # there will be a minidump in the build artifacts, so the build will be marked 
FAILED.
+# Also testing that rows which already have the desired value should not be 
skipped
+# because there is a UDF in the SET list.
 create function if not exists identity(int) returns int location 
'UDF_LOCATION' symbol='Identity';
 create table update_with_udf(int_col INT)
 stored by iceberg
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
index 0052e1dd7..7631bdc0a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
@@ -22,6 +22,25 @@ NumModifiedRows: 5
 NumRowErrors: 0
 ====
 ---- QUERY
+# No rows should be modified, because they already have the desired values.
+update tdata set vali=43 where id = 40
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumRowErrors: 0
+---- LABELS
+ID, NAME, VALF, VALI, VALV, VALB, VALT, VALS, VALD, VALDEC4, VALDEC8, 
VALDEC16, VALDATE, VALVC
+---- DML_RESULTS: tdata
+1,'martin',1.0,232232323,'a',true,1,2,3,0.000000001,2.22,3,1970-01-01,'martin'
+2,'david',1.0,99398493939,'b',false,4,5,6,0.000000004,5.55,6,1970-01-02,'david'
+3,'todd',1.0,993393939,'c',true,7,8,9,0.000000007,8.88,9,1970-01-03,'todd'
+40,'he',0.0,43,'e',false,50,60,70,0.000000050,66.60,70,1970-01-04,'he'
+120,'she',0.0,99,'f',true,-1,0,1,-0.000000001,0.00,1,1970-01-05,'she'
+---- TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE,DECIMAL,DECIMAL,DECIMAL,DATE,STRING
+---- HS2_TYPES
+INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE,DECIMAL,DECIMAL,DECIMAL,DATE,VARCHAR
+====
+---- QUERY
 # single row, equality on key, bigint
 # TODO: Verify row count in RESULTS after fixing IMPALA-3713, and supporting 
RESULTS and
 # DML_RESULTS in the same test case.
@@ -254,10 +273,11 @@ 
INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE,DECIMAL,DECIMAL,D
 
INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE,DECIMAL,DECIMAL,DECIMAL,DATE,VARCHAR
 ====
 ---- QUERY
-# multiple rows, predicate on non-key
+# predicate on non-key
+# Two rows match, but only one needs to be updated, the other is already false
 update tdata set valb=false where name LIKE '%he'
 ---- RUNTIME_PROFILE
-NumModifiedRows: 2
+NumModifiedRows: 1
 NumRowErrors: 0
 ---- LABELS
 ID, NAME, VALF, VALI, VALV, VALB, VALT, VALS, VALD, VALDEC4, VALDEC8, 
VALDEC16, VALDATE, VALVC
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 1d286b045..ac6c11669 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1708,8 +1708,8 @@ class TestIcebergV2Table(IcebergTestSuite):
     the parent ids match the previous snapshot ids. See IMPALA-12708."""
 
     self.validate_snapshots(db, "single_col", 3)
-    self.validate_snapshots(db, "ice_alltypes", 17)
-    self.validate_snapshots(db, "ice_id_partitioned", 4)
+    self.validate_snapshots(db, "ice_alltypes", 21)
+    self.validate_snapshots(db, "ice_id_partitioned", 7)
 
   def validate_snapshots(self, db, tbl, expected_snapshots):
     tbl_name = "{}.{}".format(db, tbl)
@@ -1730,16 +1730,16 @@ class TestIcebergV2Table(IcebergTestSuite):
 
     hive_results = get_hive_results("ice_alltypes", "bool_col")
     assert hive_results == \
-        "false,0,111,0.0,0.0,0,0.00,2023-11-07,2000-01-01 
00:00:00.0,IMPALA,zerob\n" \
-        "true,3,222,1.0,1.0,23,123.12,2023-11-08,2001-01-01 
01:01:01.0,ICEBERG,oneb\n"
+        "false,0,111,0.0,0.0,234,123.00,2023-11-07,2000-01-01 
00:00:00.0,IMPALA,zerob\n" \
+        "true,3,222,1.0,1.0,NULL,NULL,2023-11-08,2001-01-01 
01:01:01.0,ICEBERG,oneb\n"
 
     hive_results = get_hive_results("ice_id_partitioned", "i")
     assert hive_results == \
-        "1,0,Apache Impala\n"     \
+        "1,0,APACHE IMPALA\n"     \
         "2,0,iceberg\n"    \
         "3,0,hive\n"       \
-        "4,1,Apache Spark\n"      \
-        "5,2,Kudu\n"
+        "5,2,Kudu\n"      \
+        "10,1,Apache Spark\n"
 
     hive_results = get_hive_results("ice_bucket_transform", "i")
     assert hive_results == \

Reply via email to