This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d295a45f07 Spark 3.4: Fix writing of default values in CoW for rows 
with NULL columns which are unmatched (#9556)
d295a45f07 is described below

commit d295a45f07ea9af37aabd1afacd64755d0e84f82
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jan 30 09:18:03 2024 -0800

    Spark 3.4: Fix writing of default values in CoW for rows with NULL columns 
which are unmatched (#9556)
---
 .../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |  2 +-
 .../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
 .../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
 4 files changed, 217 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index dc1e96be48..e855eb49a1 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -322,6 +322,78 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123, value=456",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 456)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 2)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED 
THEN " + "DELETE",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); // 
kept
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
   @Test
   public void testMergeWithOnlyDeleteClause() {
     createAndInitTable(
diff --git 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 2a14c3144e..4aae44ecf1 100644
--- 
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ 
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -225,7 +225,7 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelIcebergCommand with Predicat
       targetOutput = readAttrs,
       performCardinalityCheck = performCardinalityCheck,
       emitNotMatchedTargetRows = true,
-      output = buildMergeRowsOutput(matchedOutputs, notMatchedOutputs, 
readAttrs),
+      output = buildMergeRowsOutput(matchedOutputs, notMatchedOutputs :+ 
readAttrs, readAttrs),
       joinPlan)
 
     // build a plan to replace read groups in the table
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 6c5906f79f..bded60b956 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -523,6 +523,78 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123, value=456",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 456)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 2)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED 
THEN " + "DELETE",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); // 
kept
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
   @Test
   public void testMergeWithOnlyDeleteClause() {
     createAndInitTable(
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 38ad8b0da5..2694d522c9 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -608,6 +608,78 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123, value=456",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 456)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s "
+            + "ON t.id == s.id "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id=123",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(
+            row(6, null), // kept
+            row(123, 2)); // updated
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
+  @Test
+  public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+    createAndInitTable(
+        "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, 
\"value\": null }");
+
+    createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1, 
\"value\": 100 }\n");
+    sql(
+        "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED 
THEN " + "DELETE",
+        commitTarget());
+
+    sql("SELECT * FROM %s", commitTarget());
+
+    ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); // 
kept
+
+    assertEquals(
+        "Should have expected rows",
+        expectedRows,
+        sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+  }
+
   @Test
   public void testMergeWithOnlyUpdateClauseAndNullValues() {
     createAndInitTable(

Reply via email to