This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d295a45f07 Spark 3.4: Fix writing of default values in CoW for rows
with NULL columns which are unmatched (#9556)
d295a45f07 is described below
commit d295a45f07ea9af37aabd1afacd64755d0e84f82
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Tue Jan 30 09:18:03 2024 -0800
Spark 3.4: Fix writing of default values in CoW for rows with NULL columns
which are unmatched (#9556)
---
.../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
.../catalyst/analysis/RewriteMergeIntoTable.scala | 2 +-
.../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
.../apache/iceberg/spark/extensions/TestMerge.java | 72 ++++++++++++++++++++++
4 files changed, 217 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index dc1e96be48..e855eb49a1 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -322,6 +322,78 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}
+ @Test
+ public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123, value=456",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 456)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 2)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED
THEN " + "DELETE",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); //
kept
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
@Test
public void testMergeWithOnlyDeleteClause() {
createAndInitTable(
diff --git
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 2a14c3144e..4aae44ecf1 100644
---
a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++
b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -225,7 +225,7 @@ object RewriteMergeIntoTable extends
RewriteRowLevelIcebergCommand with Predicat
targetOutput = readAttrs,
performCardinalityCheck = performCardinalityCheck,
emitNotMatchedTargetRows = true,
- output = buildMergeRowsOutput(matchedOutputs, notMatchedOutputs,
readAttrs),
+ output = buildMergeRowsOutput(matchedOutputs, notMatchedOutputs :+
readAttrs, readAttrs),
joinPlan)
// build a plan to replace read groups in the table
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 6c5906f79f..bded60b956 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -523,6 +523,78 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}
+ @Test
+ public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123, value=456",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 456)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 2)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED
THEN " + "DELETE",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); //
kept
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
@Test
public void testMergeWithOnlyDeleteClause() {
createAndInitTable(
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 38ad8b0da5..2694d522c9 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -608,6 +608,78 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}
+ @Test
+ public void testMergeWithOnlyUpdateNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123, value=456",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 456)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id=123",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows =
+ ImmutableList.of(
+ row(6, null), // kept
+ row(123, 2)); // updated
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ @Test
+ public void testMergeWithOnlyDeleteNullUnmatchedValues() {
+ createAndInitTable(
+ "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6,
\"value\": null }");
+
+ createOrReplaceView("source", "id INT NOT NULL, value INT", "{ \"id\": 1,
\"value\": 100 }\n");
+ sql(
+ "MERGE INTO %s t USING source s " + "ON t.id == s.id " + "WHEN MATCHED
THEN " + "DELETE",
+ commitTarget());
+
+ sql("SELECT * FROM %s", commitTarget());
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(row(6, null)); //
kept
+
+ assertEquals(
+ "Should have expected rows",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
@Test
public void testMergeWithOnlyUpdateClauseAndNullValues() {
createAndInitTable(