This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8387b508f9 Spark 3.4: Remove usage of AssertHelpers (#8963)
8387b508f9 is described below
commit 8387b508f96e9f38d45a982f75c262fdd8e2b621
Author: Ashok <[email protected]>
AuthorDate: Wed Nov 1 19:00:35 2023 +0530
Spark 3.4: Remove usage of AssertHelpers (#8963)
---
.../spark/extensions/TestChangelogTable.java | 10 +-
.../apache/iceberg/spark/extensions/TestMerge.java | 918 ++++++++++-----------
.../TestRequiredDistributionAndOrdering.java | 28 +-
.../iceberg/spark/extensions/TestTagDDL.java | 118 ++-
.../iceberg/spark/extensions/TestUpdate.java | 133 ++-
5 files changed, 561 insertions(+), 646 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index cc81b4b3d3..ab22eee006 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark.extensions;
-import static org.apache.iceberg.AssertHelpers.assertThrows;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
@@ -35,6 +34,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkChangelogTable;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -202,10 +202,10 @@ public class TestChangelogTable extends
SparkExtensionsTestBase {
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
- assertThrows(
- "Should fail if start time is after end time",
- IllegalArgumentException.class,
- () -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()));
+ Assertions.assertThatThrownBy(
+ () -> changelogRecords(snap3.timestampMillis(),
snap2.timestampMillis()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot set start-timestamp to be greater than
end-timestamp for changelogs");
}
@Test
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index a710ac5d4e..6c5906f79f 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PlanningMode;
@@ -713,7 +712,8 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 1, \"state\": \"off\" }\n"
+ "{ \"id\": 10, \"state\": \"on\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
Assertions.assertThatThrownBy(
() ->
sql(
@@ -747,23 +747,23 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -789,23 +789,22 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
() -> {
String errorMsg =
- "a single row from the target table with multiple rows of the
source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
});
assertEquals(
@@ -829,23 +828,22 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
() -> {
String errorMsg =
- "a single row from the target table with multiple rows of the
source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id > s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.value = 2 THEN "
- + " INSERT (id, dep) VALUES (s.value, null)",
- commitTarget());
- });
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id > s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.value = 2 THEN "
+ + " INSERT (id, dep) VALUES (s.value, null)",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
});
assertEquals(
@@ -867,21 +865,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -901,21 +899,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
ds.union(ds).createOrReplaceTempView("source");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id > s.value "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET id = 10 "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id > s.value "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET id = 10 "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -937,23 +935,23 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.id "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " UPDATE SET * "
- + "WHEN MATCHED AND t.id = 6 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.id = 2 THEN "
- + " INSERT *",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " UPDATE SET * "
+ + "WHEN MATCHED AND t.id = 6 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.id = 2 THEN "
+ + " INSERT *",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -1008,21 +1006,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
- String errorMsg = "a single row from the target table with multiple rows
of the source table";
- AssertHelpers.assertThrowsCause(
- "Should complain about multiple matches",
- SparkException.class,
- errorMsg,
- () -> {
- sql(
- "MERGE INTO %s AS t USING source AS s "
- + "ON t.id == s.id "
- + "WHEN MATCHED AND t.id = 1 THEN "
- + " DELETE "
- + "WHEN NOT MATCHED AND s.id = 2 THEN "
- + " INSERT *",
- commitTarget());
- });
+ String errorMsg =
+ "MERGE statement matched a single row from the target table with
multiple rows of the source table.";
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.id "
+ + "WHEN MATCHED AND t.id = 1 THEN "
+ + " DELETE "
+ + "WHEN NOT MATCHED AND s.id = 2 THEN "
+ + " INSERT *",
+ commitTarget()))
+ .cause()
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(errorMsg);
assertEquals(
"Target should be unchanged",
@@ -2153,46 +2151,40 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about the invalid top-level column",
- AnalysisException.class,
- "cannot resolve t.invalid_col",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.invalid_col = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.invalid_col = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("cannot resolve t.invalid_col in MERGE command");
- AssertHelpers.assertThrows(
- "Should complain about the invalid nested column",
- AnalysisException.class,
- "No such struct field `invalid_col`",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.invalid_col = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.invalid_col = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("No such struct field `invalid_col`");
- AssertHelpers.assertThrows(
- "Should complain about the invalid top-level column",
- AnalysisException.class,
- "cannot resolve invalid_col",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, invalid_col) VALUES (s.c1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, invalid_col) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("cannot resolve invalid_col in MERGE command");
}
@Test
@@ -2202,48 +2194,42 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about the nested column",
- AnalysisException.class,
- "Nested fields are not supported inside INSERT clauses",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, c.n2) VALUES (s.c1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, c.n2) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Nested fields are not supported inside INSERT
clauses");
- AssertHelpers.assertThrows(
- "Should complain about duplicate columns",
- AnalysisException.class,
- "Duplicate column names inside INSERT clause",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n2.dn1 = s.c2 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id, id) VALUES (s.c1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n2.dn1 = s.c2 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, id) VALUES (s.c1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Duplicate column names inside INSERT clause");
- AssertHelpers.assertThrows(
- "Should complain about missing columns",
- AnalysisException.class,
- "must provide values for all columns of the target table",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED THEN "
- + " INSERT (id) VALUES (s.c1)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id) VALUES (s.c1)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("must provide values for all columns of the
target table");
}
@Test
@@ -2253,31 +2239,27 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"a\": [ { \"c1\": 2, \"c2\": 3 } ], \"m\": { \"k\":
\"v\"} }");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about updating an array column",
- AnalysisException.class,
- "Updating nested fields is only supported for structs",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.a.c1 = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.a.c1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
structs");
- AssertHelpers.assertThrows(
- "Should complain about updating a map column",
- AnalysisException.class,
- "Updating nested fields is only supported for structs",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.m.key = 'new_key'",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.m.key = 'new_key'",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
structs");
}
@Test
@@ -2287,44 +2269,38 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a top-level column",
- AnalysisException.class,
- "Updates are in conflict",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updates are in conflict");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Updates are in conflict for these columns",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updates are in conflict for these columns");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Updates are in conflict",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET c.n1 = 1, c = named_struct('n1', 1,
'n2', named_struct('dn1', 1, 'dn2', 2))",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updates are in conflict");
}
@Test
@@ -2341,70 +2317,60 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- AnalysisException.class,
- "Cannot write nullable values to non-null column",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.id = NULL",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- AnalysisException.class,
- "Cannot write nullable values to non-null column",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = NULL",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "missing fields",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s = s.c2",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n1 = s.c3",
- commitTarget());
- });
-
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "field name does not match",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.s.n2 = s.c4",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.id = NULL",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot write nullable values to
non-null column");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = NULL",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot write nullable values to
non-null column");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("missing fields");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n1 = s.c3",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageEndingWith("Cannot safely cast 'n1': string to
int");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.s.n2 = s.c4",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("field name does not match");
});
}
}
@@ -2416,57 +2382,53 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic search conditions",
- AnalysisException.class,
- "Non-deterministic functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND rand() > t.id "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 AND rand() > t.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Non-deterministic functions are not supported in SEARCH
conditions of MERGE operations");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic update conditions",
- AnalysisException.class,
- "Non-deterministic functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND rand() > t.id THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND rand() > t.id THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Non-deterministic functions are not supported in UPDATE
conditions of MERGE operations");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic delete conditions",
- AnalysisException.class,
- "Non-deterministic functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND rand() > t.id THEN "
- + " DELETE",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND rand() > t.id THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Non-deterministic functions are not supported in DELETE
conditions of MERGE operations");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic insert conditions",
- AnalysisException.class,
- "Non-deterministic functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND rand() > c1 THEN "
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED AND rand() > c1 THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Non-deterministic functions are not supported in INSERT
conditions of MERGE operations");
}
@Test
@@ -2476,57 +2438,53 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in search conditions",
- AnalysisException.class,
- "Agg functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND max(t.id) == 1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 AND max(t.id) == 1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Agg functions are not supported in SEARCH conditions of MERGE
operations");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in update conditions",
- AnalysisException.class,
- "Agg functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND sum(t.id) < 1 THEN "
- + " UPDATE SET t.c.n1 = -1",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND sum(t.id) < 1 THEN "
+ + " UPDATE SET t.c.n1 = -1",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Agg functions are not supported in UPDATE conditions of MERGE
operations");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in delete conditions",
- AnalysisException.class,
- "Agg functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND sum(t.id) THEN "
- + " DELETE",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND sum(t.id) THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Agg functions are not supported in DELETE conditions of MERGE
operations");
- AssertHelpers.assertThrows(
- "Should complain about agg expressions in insert conditions",
- AnalysisException.class,
- "Agg functions are not supported",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Agg functions are not supported in INSERT conditions of MERGE
operations");
}
@Test
@@ -2536,57 +2494,53 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
"{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } }
}");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "Subqueries are not supported in conditions",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM source) "
- + "WHEN MATCHED THEN "
- + " UPDATE SET t.c.n1 = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM
source) "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET t.c.n1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Subqueries are not supported in conditions of MERGE operations.
Found a subquery in the SEARCH condition");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "Subqueries are not supported in conditions",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM source) THEN
"
- + " UPDATE SET t.c.n1 = s.c2",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM
source) THEN "
+ + " UPDATE SET t.c.n1 = s.c2",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Subqueries are not supported in conditions of MERGE operations.
Found a subquery in the UPDATE condition");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "Subqueries are not supported in conditions",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM source) THEN
"
- + " DELETE",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM
source) THEN "
+ + " DELETE",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Subqueries are not supported in conditions of MERGE operations.
Found a subquery in the DELETE condition");
- AssertHelpers.assertThrows(
- "Should complain about subquery expressions",
- AnalysisException.class,
- "Subqueries are not supported in conditions",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.c1 "
- + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM source) THEN
"
- + " INSERT (id, c) VALUES (1, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.c1 "
+ + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM
source) THEN "
+ + " INSERT (id, c) VALUES (1, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining(
+ "Subqueries are not supported in conditions of MERGE operations.
Found a subquery in the INSERT condition");
}
@Test
@@ -2594,18 +2548,16 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
createAndInitTable("id INT, c2 INT", "{ \"id\": 1, \"c2\": 2 }");
createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }");
- AssertHelpers.assertThrows(
- "Should complain about the target column",
- AnalysisException.class,
- "Cannot resolve [c2]",
- () -> {
- sql(
- "MERGE INTO %s t USING source s "
- + "ON t.id == s.id "
- + "WHEN NOT MATCHED AND c2 = 1 THEN "
- + " INSERT (id, c2) VALUES (s.id, null)",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO %s t USING source s "
+ + "ON t.id == s.id "
+ + "WHEN NOT MATCHED AND c2 = 1 THEN "
+ + " INSERT (id, c2) VALUES (s.id, null)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot resolve [c2] in INSERT condition of
MERGE operation");
}
@Test
@@ -2613,17 +2565,15 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "Should complain non iceberg target table",
- UnsupportedOperationException.class,
- "MERGE INTO TABLE is not supported temporarily.",
- () -> {
- sql(
- "MERGE INTO target t USING source s "
- + "ON t.c1 == s.c1 "
- + "WHEN MATCHED THEN "
- + " UPDATE SET *");
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "MERGE INTO target t USING source s "
+ + "ON t.c1 == s.c1 "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET *"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("MERGE INTO TABLE is not supported temporarily.");
}
/**
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
index fcdf9bf992..4c678ce9b7 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
@@ -21,13 +21,13 @@ package org.apache.iceberg.spark.extensions;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
@@ -186,20 +186,18 @@ public class TestRequiredDistributionAndOrdering extends
SparkExtensionsTestBase
Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
// should fail if ordering is disabled
- AssertHelpers.assertThrowsCause(
- "Should reject writes without ordering",
- IllegalStateException.class,
- "Encountered records that belong to already closed files",
- () -> {
- try {
- inputDF
- .writeTo(tableName)
- .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
- .append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
- }
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ inputDF
+ .writeTo(tableName)
+
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
+ .append())
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith(
+ "Incoming records violate the writer assumption that records are
clustered by spec "
+ + "and by partition within each spec. Either cluster the
incoming records or switch to fanout writers.");
}
@Test
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
index 866a965e33..52b9134089 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
@@ -97,29 +96,26 @@ public class TestTagDDL extends SparkExtensionsTestBase {
}
String tagName = "t1";
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input",
- () ->
- sql(
- "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
- tableName, tagName, firstSnapshotId, maxRefAge));
-
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input",
- () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName,
tagName, "abc"));
-
- AssertHelpers.assertThrows(
- "Illegal statement",
- IcebergParseException.class,
- "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}",
- () ->
- sql(
- "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d
SECONDS",
- tableName, tagName, firstSnapshotId, maxRefAge));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
+ tableName, tagName, firstSnapshotId, maxRefAge))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS",
tableName, tagName, "abc"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d
SECONDS",
+ tableName, tagName, firstSnapshotId, maxRefAge))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input 'SECONDS' expecting {'DAYS',
'HOURS', 'MINUTES'}");
}
@Test
@@ -137,11 +133,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
long snapshotId = table.currentSnapshot().snapshotId();
String tagName = "t1";
- AssertHelpers.assertThrows(
- "unknown snapshot",
- ValidationException.class,
- "unknown snapshot: -1",
- () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName,
tagName, -1));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d",
tableName, tagName, -1))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Cannot set " + tagName + " to unknown snapshot: -1");
sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
table.refresh();
@@ -151,17 +146,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
Assert.assertNull(
"The tag needs to have the default max ref age, which is null.",
ref.maxRefAgeMs());
- AssertHelpers.assertThrows(
- "Cannot create an exist tag",
- IllegalArgumentException.class,
- "already exists",
- () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s",
tableName, tagName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("already exists");
- AssertHelpers.assertThrows(
- "Non-conforming tag name",
- IcebergParseException.class,
- "mismatched input '123'",
- () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s",
tableName, "123"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input '123'");
table.manageSnapshots().removeTag(tagName).commit();
List<SimpleRecord> records =
@@ -207,11 +198,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
insertRows();
long second = table.currentSnapshot().snapshotId();
- AssertHelpers.assertThrows(
- "Cannot perform replace tag on branches",
- IllegalArgumentException.class,
- "Ref branch1 is a branch not a tag",
- () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName,
second));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName,
second))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Ref branch1 is a branch not a tag");
}
@Test
@@ -244,14 +234,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
public void testReplaceTagDoesNotExist() throws NoSuchTableException {
Table table = insertRows();
- AssertHelpers.assertThrows(
- "Cannot perform replace tag on tag which does not exist",
- IllegalArgumentException.class,
- "Tag does not exist",
- () ->
- sql(
- "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
- tableName, "someTag", table.currentSnapshot().snapshotId()));
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
+ tableName, "someTag",
table.currentSnapshot().snapshotId()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Tag does not exist");
}
@Test
@@ -316,20 +305,17 @@ public class TestTagDDL extends SparkExtensionsTestBase {
@Test
public void testDropTagNonConformingName() {
- AssertHelpers.assertThrows(
- "Non-conforming tag name",
- IcebergParseException.class,
- "mismatched input '123'",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123"));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s",
tableName, "123"))
+ .isInstanceOf(IcebergParseException.class)
+ .hasMessageContaining("mismatched input '123'");
}
@Test
public void testDropTagDoesNotExist() {
- AssertHelpers.assertThrows(
- "Cannot perform drop tag on tag which does not exist",
- IllegalArgumentException.class,
- "Tag does not exist: nonExistingTag",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag"));
+ Assertions.assertThatThrownBy(
+ () -> sql("ALTER TABLE %s DROP TAG %s", tableName,
"nonExistingTag"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Tag does not exist: nonExistingTag");
}
@Test
@@ -338,11 +324,9 @@ public class TestTagDDL extends SparkExtensionsTestBase {
Table table = insertRows();
table.manageSnapshots().createBranch(branchName,
table.currentSnapshot().snapshotId()).commit();
- AssertHelpers.assertThrows(
- "Cannot perform drop tag on branch",
- IllegalArgumentException.class,
- "Ref b1 is a branch not a tag",
- () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName));
+ Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s",
tableName, branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Ref b1 is a branch not a tag");
}
@Test
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 7845d8e2e3..8b5e1cd1d4 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PlanningMode;
@@ -1316,17 +1315,13 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
"id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>",
"{ \"id\": 0, \"a\": null, \"m\": null }");
- AssertHelpers.assertThrows(
- "Should complain about updating an array column",
- AnalysisException.class,
- "Updating nested fields is only supported for structs",
- () -> sql("UPDATE %s SET a.c1 = 1", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about updating a map column",
- AnalysisException.class,
- "Updating nested fields is only supported for structs",
- () -> sql("UPDATE %s SET m.key = 'new_key'", commitTarget()));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET a.c1 = 1",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
structs");
+
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET m.key = 'new_key'",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Updating nested fields is only supported for
structs");
}
@Test
@@ -1334,27 +1329,23 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
createAndInitTable(
"id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 0,
\"s\": null }");
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a top-level column",
- AnalysisException.class,
- "Updates are in conflict",
- () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Updates are in conflict for these columns",
- () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about conflicting updates to a nested column",
- AnalysisException.class,
- "Updates are in conflict",
- () -> {
- sql(
- "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
- commitTarget());
- });
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Updates are in conflict for these columns");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Updates are in conflict for these columns");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2',
named_struct('dn1', 1, 'dn2', 2))",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Updates are in conflict for these columns");
}
@Test
@@ -1367,38 +1358,32 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
withSQLConf(
ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy),
() -> {
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a top-level column",
- AnalysisException.class,
- "Cannot write nullable values to non-null column",
- () -> sql("UPDATE %s t SET t.id = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing nulls to a nested column",
- AnalysisException.class,
- "Cannot write nullable values to non-null column",
- () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing missing fields in structs",
- AnalysisException.class,
- "missing fields",
- () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing invalid data types",
- AnalysisException.class,
- "Cannot safely cast",
- () -> sql("UPDATE %s t SET t.s.n1 = 'str'", commitTarget()));
-
- AssertHelpers.assertThrows(
- "Should complain about writing incompatible structs",
- AnalysisException.class,
- "field name does not match",
- () ->
- sql(
- "UPDATE %s t SET t.s.n2 = named_struct('dn2', 1,
'dn1', 2)",
- commitTarget()));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.id =
NULL", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Cannot write nullable values to
non-null column");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Cannot write nullable values to
non-null column");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Cannot write incompatible data:");
+
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s t SET t.s.n1 = 'str'",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Cannot safely cast");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "UPDATE %s t SET t.s.n2 = named_struct('dn3', 1,
'dn1', 2)",
+ commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith("Cannot write incompatible data:");
});
}
}
@@ -1407,22 +1392,20 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
public void testUpdateWithNonDeterministicCondition() {
createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
- AssertHelpers.assertThrows(
- "Should complain about non-deterministic expressions",
- AnalysisException.class,
- "nondeterministic expressions are only allowed",
- () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5",
commitTarget()));
+ Assertions.assertThatThrownBy(
+ () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5",
commitTarget()))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith(
+ "nondeterministic expressions are only allowed in Project, Filter,
Aggregate or Window");
}
@Test
public void testUpdateOnNonIcebergTableNotSupported() {
createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
- AssertHelpers.assertThrows(
- "UPDATE is not supported for non iceberg table",
- UnsupportedOperationException.class,
- "not supported temporarily",
- () -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable"));
+ Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET c1 = -1 WHERE c2 =
1", "testtable"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("UPDATE TABLE is not supported temporarily.");
}
@Test