This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8387b508f9 Spark 3.4: Remove usage of AssertHelpers (#8963)
8387b508f9 is described below

commit 8387b508f96e9f38d45a982f75c262fdd8e2b621
Author: Ashok <[email protected]>
AuthorDate: Wed Nov 1 19:00:35 2023 +0530

    Spark 3.4: Remove usage of AssertHelpers (#8963)
---
 .../spark/extensions/TestChangelogTable.java       |  10 +-
 .../apache/iceberg/spark/extensions/TestMerge.java | 918 ++++++++++-----------
 .../TestRequiredDistributionAndOrdering.java       |  28 +-
 .../iceberg/spark/extensions/TestTagDDL.java       | 118 ++-
 .../iceberg/spark/extensions/TestUpdate.java       | 133 ++-
 5 files changed, 561 insertions(+), 646 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
index cc81b4b3d3..ab22eee006 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.spark.extensions;
 
-import static org.apache.iceberg.AssertHelpers.assertThrows;
 import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
 import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
 import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
@@ -35,6 +34,7 @@ import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.source.SparkChangelogTable;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -202,10 +202,10 @@ public class TestChangelogTable extends 
SparkExtensionsTestBase {
     Snapshot snap3 = table.currentSnapshot();
     long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());
 
-    assertThrows(
-        "Should fail if start time is after end time",
-        IllegalArgumentException.class,
-        () -> changelogRecords(snap3.timestampMillis(), 
snap2.timestampMillis()));
+    Assertions.assertThatThrownBy(
+            () -> changelogRecords(snap3.timestampMillis(), 
snap2.timestampMillis()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot set start-timestamp to be greater than 
end-timestamp for changelogs");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index a710ac5d4e..6c5906f79f 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.PlanningMode;
@@ -713,7 +712,8 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
             + "{ \"id\": 1, \"state\": \"off\" }\n"
             + "{ \"id\": 10, \"state\": \"on\" }");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
     Assertions.assertThatThrownBy(
             () ->
                 sql(
@@ -747,23 +747,23 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
     ds.union(ds).createOrReplaceTempView("source");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
-    AssertHelpers.assertThrowsCause(
-        "Should complain about multiple matches",
-        SparkException.class,
-        errorMsg,
-        () -> {
-          sql(
-              "MERGE INTO %s AS t USING source AS s "
-                  + "ON t.id == s.value "
-                  + "WHEN MATCHED AND t.id = 1 THEN "
-                  + "  UPDATE SET id = 10 "
-                  + "WHEN MATCHED AND t.id = 6 THEN "
-                  + "  DELETE "
-                  + "WHEN NOT MATCHED AND s.value = 2 THEN "
-                  + "  INSERT (id, dep) VALUES (s.value, null)",
-              commitTarget());
-        });
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s AS t USING source AS s "
+                        + "ON t.id == s.value "
+                        + "WHEN MATCHED AND t.id = 1 THEN "
+                        + "  UPDATE SET id = 10 "
+                        + "WHEN MATCHED AND t.id = 6 THEN "
+                        + "  DELETE "
+                        + "WHEN NOT MATCHED AND s.value = 2 THEN "
+                        + "  INSERT (id, dep) VALUES (s.value, null)",
+                    commitTarget()))
+        .cause()
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(errorMsg);
 
     assertEquals(
         "Target should be unchanged",
@@ -789,23 +789,22 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
         () -> {
           String errorMsg =
-              "a single row from the target table with multiple rows of the 
source table";
-          AssertHelpers.assertThrowsCause(
-              "Should complain about multiple matches",
-              SparkException.class,
-              errorMsg,
-              () -> {
-                sql(
-                    "MERGE INTO %s AS t USING source AS s "
-                        + "ON t.id == s.value "
-                        + "WHEN MATCHED AND t.id = 1 THEN "
-                        + "  UPDATE SET id = 10 "
-                        + "WHEN MATCHED AND t.id = 6 THEN "
-                        + "  DELETE "
-                        + "WHEN NOT MATCHED AND s.value = 2 THEN "
-                        + "  INSERT (id, dep) VALUES (s.value, null)",
-                    commitTarget());
-              });
+              "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+          Assertions.assertThatThrownBy(
+                  () ->
+                      sql(
+                          "MERGE INTO %s AS t USING source AS s "
+                              + "ON t.id == s.value "
+                              + "WHEN MATCHED AND t.id = 1 THEN "
+                              + "  UPDATE SET id = 10 "
+                              + "WHEN MATCHED AND t.id = 6 THEN "
+                              + "  DELETE "
+                              + "WHEN NOT MATCHED AND s.value = 2 THEN "
+                              + "  INSERT (id, dep) VALUES (s.value, null)",
+                          commitTarget()))
+              .cause()
+              .isInstanceOf(SparkException.class)
+              .hasMessageContaining(errorMsg);
         });
 
     assertEquals(
@@ -829,23 +828,22 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         ImmutableMap.of(SQLConf.PREFER_SORTMERGEJOIN().key(), "false"),
         () -> {
           String errorMsg =
-              "a single row from the target table with multiple rows of the 
source table";
-          AssertHelpers.assertThrowsCause(
-              "Should complain about multiple matches",
-              SparkException.class,
-              errorMsg,
-              () -> {
-                sql(
-                    "MERGE INTO %s AS t USING source AS s "
-                        + "ON t.id > s.value "
-                        + "WHEN MATCHED AND t.id = 1 THEN "
-                        + "  UPDATE SET id = 10 "
-                        + "WHEN MATCHED AND t.id = 6 THEN "
-                        + "  DELETE "
-                        + "WHEN NOT MATCHED AND s.value = 2 THEN "
-                        + "  INSERT (id, dep) VALUES (s.value, null)",
-                    commitTarget());
-              });
+              "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+          Assertions.assertThatThrownBy(
+                  () ->
+                      sql(
+                          "MERGE INTO %s AS t USING source AS s "
+                              + "ON t.id > s.value "
+                              + "WHEN MATCHED AND t.id = 1 THEN "
+                              + "  UPDATE SET id = 10 "
+                              + "WHEN MATCHED AND t.id = 6 THEN "
+                              + "  DELETE "
+                              + "WHEN NOT MATCHED AND s.value = 2 THEN "
+                              + "  INSERT (id, dep) VALUES (s.value, null)",
+                          commitTarget()))
+              .cause()
+              .isInstanceOf(SparkException.class)
+              .hasMessageContaining(errorMsg);
         });
 
     assertEquals(
@@ -867,21 +865,21 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
     ds.union(ds).createOrReplaceTempView("source");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
-    AssertHelpers.assertThrowsCause(
-        "Should complain about multiple matches",
-        SparkException.class,
-        errorMsg,
-        () -> {
-          sql(
-              "MERGE INTO %s AS t USING source AS s "
-                  + "ON t.id == s.value "
-                  + "WHEN MATCHED AND t.id = 1 THEN "
-                  + "  UPDATE SET id = 10 "
-                  + "WHEN MATCHED AND t.id = 6 THEN "
-                  + "  DELETE",
-              commitTarget());
-        });
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s AS t USING source AS s "
+                        + "ON t.id == s.value "
+                        + "WHEN MATCHED AND t.id = 1 THEN "
+                        + "  UPDATE SET id = 10 "
+                        + "WHEN MATCHED AND t.id = 6 THEN "
+                        + "  DELETE",
+                    commitTarget()))
+        .cause()
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(errorMsg);
 
     assertEquals(
         "Target should be unchanged",
@@ -901,21 +899,21 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     Dataset<Integer> ds = spark.createDataset(sourceIds, Encoders.INT());
     ds.union(ds).createOrReplaceTempView("source");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
-    AssertHelpers.assertThrowsCause(
-        "Should complain about multiple matches",
-        SparkException.class,
-        errorMsg,
-        () -> {
-          sql(
-              "MERGE INTO %s AS t USING source AS s "
-                  + "ON t.id > s.value "
-                  + "WHEN MATCHED AND t.id = 1 THEN "
-                  + "  UPDATE SET id = 10 "
-                  + "WHEN MATCHED AND t.id = 6 THEN "
-                  + "  DELETE",
-              commitTarget());
-        });
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s AS t USING source AS s "
+                        + "ON t.id > s.value "
+                        + "WHEN MATCHED AND t.id = 1 THEN "
+                        + "  UPDATE SET id = 10 "
+                        + "WHEN MATCHED AND t.id = 6 THEN "
+                        + "  DELETE",
+                    commitTarget()))
+        .cause()
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(errorMsg);
 
     assertEquals(
         "Target should be unchanged",
@@ -937,23 +935,23 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
             + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
             + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
-    AssertHelpers.assertThrowsCause(
-        "Should complain about multiple matches",
-        SparkException.class,
-        errorMsg,
-        () -> {
-          sql(
-              "MERGE INTO %s AS t USING source AS s "
-                  + "ON t.id == s.id "
-                  + "WHEN MATCHED AND t.id = 1 THEN "
-                  + "  UPDATE SET * "
-                  + "WHEN MATCHED AND t.id = 6 THEN "
-                  + "  DELETE "
-                  + "WHEN NOT MATCHED AND s.id = 2 THEN "
-                  + "  INSERT *",
-              commitTarget());
-        });
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s AS t USING source AS s "
+                        + "ON t.id == s.id "
+                        + "WHEN MATCHED AND t.id = 1 THEN "
+                        + "  UPDATE SET * "
+                        + "WHEN MATCHED AND t.id = 6 THEN "
+                        + "  DELETE "
+                        + "WHEN NOT MATCHED AND s.id = 2 THEN "
+                        + "  INSERT *",
+                    commitTarget()))
+        .cause()
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(errorMsg);
 
     assertEquals(
         "Target should be unchanged",
@@ -1008,21 +1006,21 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
             + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
             + "{ \"id\": 6, \"dep\": \"emp-id-6\" }");
 
-    String errorMsg = "a single row from the target table with multiple rows 
of the source table";
-    AssertHelpers.assertThrowsCause(
-        "Should complain about multiple matches",
-        SparkException.class,
-        errorMsg,
-        () -> {
-          sql(
-              "MERGE INTO %s AS t USING source AS s "
-                  + "ON t.id == s.id "
-                  + "WHEN MATCHED AND t.id = 1 THEN "
-                  + "  DELETE "
-                  + "WHEN NOT MATCHED AND s.id = 2 THEN "
-                  + "  INSERT *",
-              commitTarget());
-        });
+    String errorMsg =
+        "MERGE statement matched a single row from the target table with 
multiple rows of the source table.";
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s AS t USING source AS s "
+                        + "ON t.id == s.id "
+                        + "WHEN MATCHED AND t.id = 1 THEN "
+                        + "  DELETE "
+                        + "WHEN NOT MATCHED AND s.id = 2 THEN "
+                        + "  INSERT *",
+                    commitTarget()))
+        .cause()
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(errorMsg);
 
     assertEquals(
         "Target should be unchanged",
@@ -2153,46 +2151,40 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about the invalid top-level column",
-        AnalysisException.class,
-        "cannot resolve t.invalid_col",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.invalid_col = s.c2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.invalid_col = s.c2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("cannot resolve t.invalid_col in MERGE command");
 
-    AssertHelpers.assertThrows(
-        "Should complain about the invalid nested column",
-        AnalysisException.class,
-        "No such struct field `invalid_col`",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n2.invalid_col = s.c2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n2.invalid_col = s.c2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("No such struct field `invalid_col`");
 
-    AssertHelpers.assertThrows(
-        "Should complain about the invalid top-level column",
-        AnalysisException.class,
-        "cannot resolve invalid_col",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n2.dn1 = s.c2 "
-                  + "WHEN NOT MATCHED THEN "
-                  + "  INSERT (id, invalid_col) VALUES (s.c1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n2.dn1 = s.c2 "
+                        + "WHEN NOT MATCHED THEN "
+                        + "  INSERT (id, invalid_col) VALUES (s.c1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("cannot resolve invalid_col in MERGE command");
   }
 
   @Test
@@ -2202,48 +2194,42 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about the nested column",
-        AnalysisException.class,
-        "Nested fields are not supported inside INSERT clauses",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n2.dn1 = s.c2 "
-                  + "WHEN NOT MATCHED THEN "
-                  + "  INSERT (id, c.n2) VALUES (s.c1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n2.dn1 = s.c2 "
+                        + "WHEN NOT MATCHED THEN "
+                        + "  INSERT (id, c.n2) VALUES (s.c1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Nested fields are not supported inside INSERT 
clauses");
 
-    AssertHelpers.assertThrows(
-        "Should complain about duplicate columns",
-        AnalysisException.class,
-        "Duplicate column names inside INSERT clause",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n2.dn1 = s.c2 "
-                  + "WHEN NOT MATCHED THEN "
-                  + "  INSERT (id, id) VALUES (s.c1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n2.dn1 = s.c2 "
+                        + "WHEN NOT MATCHED THEN "
+                        + "  INSERT (id, id) VALUES (s.c1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Duplicate column names inside INSERT clause");
 
-    AssertHelpers.assertThrows(
-        "Should complain about missing columns",
-        AnalysisException.class,
-        "must provide values for all columns of the target table",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN NOT MATCHED THEN "
-                  + "  INSERT (id) VALUES (s.c1)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN NOT MATCHED THEN "
+                        + "  INSERT (id) VALUES (s.c1)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("must provide values for all columns of the 
target table");
   }
 
   @Test
@@ -2253,31 +2239,27 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"a\": [ { \"c1\": 2, \"c2\": 3 } ], \"m\": { \"k\": 
\"v\"} }");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about updating an array column",
-        AnalysisException.class,
-        "Updating nested fields is only supported for structs",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.a.c1 = s.c2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.a.c1 = s.c2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updating nested fields is only supported for 
structs");
 
-    AssertHelpers.assertThrows(
-        "Should complain about updating a map column",
-        AnalysisException.class,
-        "Updating nested fields is only supported for structs",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.m.key = 'new_key'",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.m.key = 'new_key'",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updating nested fields is only supported for 
structs");
   }
 
   @Test
@@ -2287,44 +2269,38 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a top-level column",
-        AnalysisException.class,
-        "Updates are in conflict",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.id = 1, t.c.n1 = 2, t.id = 2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updates are in conflict");
 
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a nested column",
-        AnalysisException.class,
-        "Updates are in conflict for these columns",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updates are in conflict for these columns");
 
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a nested column",
-        AnalysisException.class,
-        "Updates are in conflict",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 'n2', 
named_struct('dn1', 1, 'dn2', 2))",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET c.n1 = 1, c = named_struct('n1', 1, 
'n2', named_struct('dn1', 1, 'dn2', 2))",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updates are in conflict");
   }
 
   @Test
@@ -2341,70 +2317,60 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
       withSQLConf(
           ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy),
           () -> {
-            AssertHelpers.assertThrows(
-                "Should complain about writing nulls to a top-level column",
-                AnalysisException.class,
-                "Cannot write nullable values to non-null column",
-                () -> {
-                  sql(
-                      "MERGE INTO %s t USING source s "
-                          + "ON t.id == s.c1 "
-                          + "WHEN MATCHED THEN "
-                          + "  UPDATE SET t.id = NULL",
-                      commitTarget());
-                });
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing nulls to a nested column",
-                AnalysisException.class,
-                "Cannot write nullable values to non-null column",
-                () -> {
-                  sql(
-                      "MERGE INTO %s t USING source s "
-                          + "ON t.id == s.c1 "
-                          + "WHEN MATCHED THEN "
-                          + "  UPDATE SET t.s.n1 = NULL",
-                      commitTarget());
-                });
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing missing fields in structs",
-                AnalysisException.class,
-                "missing fields",
-                () -> {
-                  sql(
-                      "MERGE INTO %s t USING source s "
-                          + "ON t.id == s.c1 "
-                          + "WHEN MATCHED THEN "
-                          + "  UPDATE SET t.s = s.c2",
-                      commitTarget());
-                });
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing invalid data types",
-                AnalysisException.class,
-                "Cannot safely cast",
-                () -> {
-                  sql(
-                      "MERGE INTO %s t USING source s "
-                          + "ON t.id == s.c1 "
-                          + "WHEN MATCHED THEN "
-                          + "  UPDATE SET t.s.n1 = s.c3",
-                      commitTarget());
-                });
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing incompatible structs",
-                AnalysisException.class,
-                "field name does not match",
-                () -> {
-                  sql(
-                      "MERGE INTO %s t USING source s "
-                          + "ON t.id == s.c1 "
-                          + "WHEN MATCHED THEN "
-                          + "  UPDATE SET t.s.n2 = s.c4",
-                      commitTarget());
-                });
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "MERGE INTO %s t USING source s "
+                                + "ON t.id == s.c1 "
+                                + "WHEN MATCHED THEN "
+                                + "  UPDATE SET t.id = NULL",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Cannot write nullable values to 
non-null column");
+
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "MERGE INTO %s t USING source s "
+                                + "ON t.id == s.c1 "
+                                + "WHEN MATCHED THEN "
+                                + "  UPDATE SET t.s.n1 = NULL",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Cannot write nullable values to 
non-null column");
+
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "MERGE INTO %s t USING source s "
+                                + "ON t.id == s.c1 "
+                                + "WHEN MATCHED THEN "
+                                + "  UPDATE SET t.s = s.c2",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("missing fields");
+
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "MERGE INTO %s t USING source s "
+                                + "ON t.id == s.c1 "
+                                + "WHEN MATCHED THEN "
+                                + "  UPDATE SET t.s.n1 = s.c3",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageEndingWith("Cannot safely cast 'n1': string to 
int");
+
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "MERGE INTO %s t USING source s "
+                                + "ON t.id == s.c1 "
+                                + "WHEN MATCHED THEN "
+                                + "  UPDATE SET t.s.n2 = s.c4",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("field name does not match");
           });
     }
   }
@@ -2416,57 +2382,53 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about non-deterministic search conditions",
-        AnalysisException.class,
-        "Non-deterministic functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 AND rand() > t.id "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n1 = -1",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 AND rand() > t.id "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n1 = -1",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Non-deterministic functions are not supported in SEARCH 
conditions of MERGE operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about non-deterministic update conditions",
-        AnalysisException.class,
-        "Non-deterministic functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND rand() > t.id THEN "
-                  + "  UPDATE SET t.c.n1 = -1",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND rand() > t.id THEN "
+                        + "  UPDATE SET t.c.n1 = -1",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Non-deterministic functions are not supported in UPDATE 
conditions of MERGE operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about non-deterministic delete conditions",
-        AnalysisException.class,
-        "Non-deterministic functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND rand() > t.id THEN "
-                  + "  DELETE",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND rand() > t.id THEN "
+                        + "  DELETE",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Non-deterministic functions are not supported in DELETE 
conditions of MERGE operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about non-deterministic insert conditions",
-        AnalysisException.class,
-        "Non-deterministic functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN NOT MATCHED AND rand() > c1 THEN "
-                  + "  INSERT (id, c) VALUES (1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN NOT MATCHED AND rand() > c1 THEN "
+                        + "  INSERT (id, c) VALUES (1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Non-deterministic functions are not supported in INSERT 
conditions of MERGE operations");
   }
 
   @Test
@@ -2476,57 +2438,53 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about agg expressions in search conditions",
-        AnalysisException.class,
-        "Agg functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 AND max(t.id) == 1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n1 = -1",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 AND max(t.id) == 1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n1 = -1",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Agg functions are not supported in SEARCH conditions of MERGE 
operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about agg expressions in update conditions",
-        AnalysisException.class,
-        "Agg functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND sum(t.id) < 1 THEN "
-                  + "  UPDATE SET t.c.n1 = -1",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND sum(t.id) < 1 THEN "
+                        + "  UPDATE SET t.c.n1 = -1",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Agg functions are not supported in UPDATE conditions of MERGE 
operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about agg expressions in delete conditions",
-        AnalysisException.class,
-        "Agg functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND sum(t.id) THEN "
-                  + "  DELETE",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND sum(t.id) THEN "
+                        + "  DELETE",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Agg functions are not supported in DELETE conditions of MERGE 
operations");
 
-    AssertHelpers.assertThrows(
-        "Should complain about agg expressions in insert conditions",
-        AnalysisException.class,
-        "Agg functions are not supported",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
-                  + "  INSERT (id, c) VALUES (1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN NOT MATCHED AND sum(c1) < 1 THEN "
+                        + "  INSERT (id, c) VALUES (1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Agg functions are not supported in INSERT conditions of MERGE 
operations");
   }
 
   @Test
@@ -2536,57 +2494,53 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         "{ \"id\": 1, \"c\": { \"n1\": 2, \"n2\": { \"dn1\": 3, \"dn2\": 4 } } 
}");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about subquery expressions",
-        AnalysisException.class,
-        "Subqueries are not supported in conditions",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM source) "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET t.c.n1 = s.c2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 AND t.id < (SELECT max(c2) FROM 
source) "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET t.c.n1 = s.c2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Subqueries are not supported in conditions of MERGE operations. 
Found a subquery in the SEARCH condition");
 
-    AssertHelpers.assertThrows(
-        "Should complain about subquery expressions",
-        AnalysisException.class,
-        "Subqueries are not supported in conditions",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM source) THEN 
"
-                  + "  UPDATE SET t.c.n1 = s.c2",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND t.id < (SELECT max(c2) FROM 
source) THEN "
+                        + "  UPDATE SET t.c.n1 = s.c2",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Subqueries are not supported in conditions of MERGE operations. 
Found a subquery in the UPDATE condition");
 
-    AssertHelpers.assertThrows(
-        "Should complain about subquery expressions",
-        AnalysisException.class,
-        "Subqueries are not supported in conditions",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM source) THEN 
"
-                  + "  DELETE",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN MATCHED AND t.id NOT IN (SELECT c2 FROM 
source) THEN "
+                        + "  DELETE",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Subqueries are not supported in conditions of MERGE operations. 
Found a subquery in the DELETE condition");
 
-    AssertHelpers.assertThrows(
-        "Should complain about subquery expressions",
-        AnalysisException.class,
-        "Subqueries are not supported in conditions",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.c1 "
-                  + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM source) THEN 
"
-                  + "  INSERT (id, c) VALUES (1, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.c1 "
+                        + "WHEN NOT MATCHED AND s.c1 IN (SELECT c2 FROM 
source) THEN "
+                        + "  INSERT (id, c) VALUES (1, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining(
+            "Subqueries are not supported in conditions of MERGE operations. 
Found a subquery in the INSERT condition");
   }
 
   @Test
@@ -2594,18 +2548,16 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     createAndInitTable("id INT, c2 INT", "{ \"id\": 1, \"c2\": 2 }");
     createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about the target column",
-        AnalysisException.class,
-        "Cannot resolve [c2]",
-        () -> {
-          sql(
-              "MERGE INTO %s t USING source s "
-                  + "ON t.id == s.id "
-                  + "WHEN NOT MATCHED AND c2 = 1 THEN "
-                  + "  INSERT (id, c2) VALUES (s.id, null)",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO %s t USING source s "
+                        + "ON t.id == s.id "
+                        + "WHEN NOT MATCHED AND c2 = 1 THEN "
+                        + "  INSERT (id, c2) VALUES (s.id, null)",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Cannot resolve [c2] in INSERT condition of 
MERGE operation");
   }
 
   @Test
@@ -2613,17 +2565,15 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }");
     createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "Should complain non iceberg target table",
-        UnsupportedOperationException.class,
-        "MERGE INTO TABLE is not supported temporarily.",
-        () -> {
-          sql(
-              "MERGE INTO target t USING source s "
-                  + "ON t.c1 == s.c1 "
-                  + "WHEN MATCHED THEN "
-                  + "  UPDATE SET *");
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "MERGE INTO target t USING source s "
+                        + "ON t.c1 == s.c1 "
+                        + "WHEN MATCHED THEN "
+                        + "  UPDATE SET *"))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("MERGE INTO TABLE is not supported temporarily.");
   }
 
   /**
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
index fcdf9bf992..4c678ce9b7 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
@@ -21,13 +21,13 @@ package org.apache.iceberg.spark.extensions;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Test;
 
@@ -186,20 +186,18 @@ public class TestRequiredDistributionAndOrdering extends 
SparkExtensionsTestBase
     Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
 
     // should fail if ordering is disabled
-    AssertHelpers.assertThrowsCause(
-        "Should reject writes without ordering",
-        IllegalStateException.class,
-        "Encountered records that belong to already closed files",
-        () -> {
-          try {
-            inputDF
-                .writeTo(tableName)
-                .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
-                .append();
-          } catch (NoSuchTableException e) {
-            throw new RuntimeException(e);
-          }
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                inputDF
+                    .writeTo(tableName)
+                    
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+                    .option(SparkWriteOptions.FANOUT_ENABLED, "false")
+                    .append())
+        .cause()
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageStartingWith(
+            "Incoming records violate the writer assumption that records are 
clustered by spec "
+                + "and by partition within each spec. Either cluster the 
incoming records or switch to fanout writers.");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
index 866a965e33..52b9134089 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -97,29 +96,26 @@ public class TestTagDDL extends SparkExtensionsTestBase {
     }
 
     String tagName = "t1";
-    AssertHelpers.assertThrows(
-        "Illegal statement",
-        IcebergParseException.class,
-        "mismatched input",
-        () ->
-            sql(
-                "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
-                tableName, tagName, firstSnapshotId, maxRefAge));
-
-    AssertHelpers.assertThrows(
-        "Illegal statement",
-        IcebergParseException.class,
-        "mismatched input",
-        () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, 
tagName, "abc"));
-
-    AssertHelpers.assertThrows(
-        "Illegal statement",
-        IcebergParseException.class,
-        "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}",
-        () ->
-            sql(
-                "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d 
SECONDS",
-                tableName, tagName, firstSnapshotId, maxRefAge));
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
+                    tableName, tagName, firstSnapshotId, maxRefAge))
+        .isInstanceOf(IcebergParseException.class)
+        .hasMessageContaining("mismatched input");
+
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", 
tableName, tagName, "abc"))
+        .isInstanceOf(IcebergParseException.class)
+        .hasMessageContaining("mismatched input");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d 
SECONDS",
+                    tableName, tagName, firstSnapshotId, maxRefAge))
+        .isInstanceOf(IcebergParseException.class)
+        .hasMessageContaining("mismatched input 'SECONDS' expecting {'DAYS', 
'HOURS', 'MINUTES'}");
   }
 
   @Test
@@ -137,11 +133,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
     long snapshotId = table.currentSnapshot().snapshotId();
     String tagName = "t1";
 
-    AssertHelpers.assertThrows(
-        "unknown snapshot",
-        ValidationException.class,
-        "unknown snapshot: -1",
-        () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, 
tagName, -1));
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", 
tableName, tagName, -1))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Cannot set " + tagName + " to unknown snapshot: -1");
 
     sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
     table.refresh();
@@ -151,17 +146,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
     Assert.assertNull(
         "The tag needs to have the default max ref age, which is null.", 
ref.maxRefAgeMs());
 
-    AssertHelpers.assertThrows(
-        "Cannot create an exist tag",
-        IllegalArgumentException.class,
-        "already exists",
-        () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName));
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", 
tableName, tagName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("already exists");
 
-    AssertHelpers.assertThrows(
-        "Non-conforming tag name",
-        IcebergParseException.class,
-        "mismatched input '123'",
-        () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123"));
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", 
tableName, "123"))
+        .isInstanceOf(IcebergParseException.class)
+        .hasMessageContaining("mismatched input '123'");
 
     table.manageSnapshots().removeTag(tagName).commit();
     List<SimpleRecord> records =
@@ -207,11 +198,10 @@ public class TestTagDDL extends SparkExtensionsTestBase {
     insertRows();
     long second = table.currentSnapshot().snapshotId();
 
-    AssertHelpers.assertThrows(
-        "Cannot perform replace tag on branches",
-        IllegalArgumentException.class,
-        "Ref branch1 is a branch not a tag",
-        () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, 
second));
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, 
second))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref branch1 is a branch not a tag");
   }
 
   @Test
@@ -244,14 +234,13 @@ public class TestTagDDL extends SparkExtensionsTestBase {
   public void testReplaceTagDoesNotExist() throws NoSuchTableException {
     Table table = insertRows();
 
-    AssertHelpers.assertThrows(
-        "Cannot perform replace tag on tag which does not exist",
-        IllegalArgumentException.class,
-        "Tag does not exist",
-        () ->
-            sql(
-                "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
-                tableName, "someTag", table.currentSnapshot().snapshotId()));
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
+                    tableName, "someTag", 
table.currentSnapshot().snapshotId()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist");
   }
 
   @Test
@@ -316,20 +305,17 @@ public class TestTagDDL extends SparkExtensionsTestBase {
 
   @Test
   public void testDropTagNonConformingName() {
-    AssertHelpers.assertThrows(
-        "Non-conforming tag name",
-        IcebergParseException.class,
-        "mismatched input '123'",
-        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123"));
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s", 
tableName, "123"))
+        .isInstanceOf(IcebergParseException.class)
+        .hasMessageContaining("mismatched input '123'");
   }
 
   @Test
   public void testDropTagDoesNotExist() {
-    AssertHelpers.assertThrows(
-        "Cannot perform drop tag on tag which does not exist",
-        IllegalArgumentException.class,
-        "Tag does not exist: nonExistingTag",
-        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag"));
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE %s DROP TAG %s", tableName, 
"nonExistingTag"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Tag does not exist: nonExistingTag");
   }
 
   @Test
@@ -338,11 +324,9 @@ public class TestTagDDL extends SparkExtensionsTestBase {
     Table table = insertRows();
     table.manageSnapshots().createBranch(branchName, 
table.currentSnapshot().snapshotId()).commit();
 
-    AssertHelpers.assertThrows(
-        "Cannot perform drop tag on branch",
-        IllegalArgumentException.class,
-        "Ref b1 is a branch not a tag",
-        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName));
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s DROP TAG %s", 
tableName, branchName))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Ref b1 is a branch not a tag");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 7845d8e2e3..8b5e1cd1d4 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -45,7 +45,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.PlanningMode;
@@ -1316,17 +1315,13 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
         "id INT, a ARRAY<STRUCT<c1:INT,c2:INT>>, m MAP<STRING,STRING>",
         "{ \"id\": 0, \"a\": null, \"m\": null }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about updating an array column",
-        AnalysisException.class,
-        "Updating nested fields is only supported for structs",
-        () -> sql("UPDATE %s SET a.c1 = 1", commitTarget()));
-
-    AssertHelpers.assertThrows(
-        "Should complain about updating a map column",
-        AnalysisException.class,
-        "Updating nested fields is only supported for structs",
-        () -> sql("UPDATE %s SET m.key = 'new_key'", commitTarget()));
+    Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET a.c1 = 1", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updating nested fields is only supported for 
structs");
+
+    Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET m.key = 'new_key'", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageContaining("Updating nested fields is only supported for 
structs");
   }
 
   @Test
@@ -1334,27 +1329,23 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
     createAndInitTable(
         "id INT, c STRUCT<n1:INT,n2:STRUCT<dn1:INT,dn2:INT>>", "{ \"id\": 0, 
\"s\": null }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a top-level column",
-        AnalysisException.class,
-        "Updates are in conflict",
-        () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2", 
commitTarget()));
-
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a nested column",
-        AnalysisException.class,
-        "Updates are in conflict for these columns",
-        () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", 
commitTarget()));
-
-    AssertHelpers.assertThrows(
-        "Should complain about conflicting updates to a nested column",
-        AnalysisException.class,
-        "Updates are in conflict",
-        () -> {
-          sql(
-              "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2', 
named_struct('dn1', 1, 'dn2', 2))",
-              commitTarget());
-        });
+    Assertions.assertThatThrownBy(
+            () -> sql("UPDATE %s t SET t.id = 1, t.c.n1 = 2, t.id = 2", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith("Updates are in conflict for these columns");
+
+    Assertions.assertThatThrownBy(
+            () -> sql("UPDATE %s t SET t.c.n1 = 1, t.id = 2, t.c.n1 = 2", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith("Updates are in conflict for these columns");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "UPDATE %s SET c.n1 = 1, c = named_struct('n1', 1, 'n2', 
named_struct('dn1', 1, 'dn2', 2))",
+                    commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith("Updates are in conflict for these columns");
   }
 
   @Test
@@ -1367,38 +1358,32 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
       withSQLConf(
           ImmutableMap.of("spark.sql.storeAssignmentPolicy", policy),
           () -> {
-            AssertHelpers.assertThrows(
-                "Should complain about writing nulls to a top-level column",
-                AnalysisException.class,
-                "Cannot write nullable values to non-null column",
-                () -> sql("UPDATE %s t SET t.id = NULL", commitTarget()));
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing nulls to a nested column",
-                AnalysisException.class,
-                "Cannot write nullable values to non-null column",
-                () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()));
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing missing fields in structs",
-                AnalysisException.class,
-                "missing fields",
-                () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)", 
commitTarget()));
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing invalid data types",
-                AnalysisException.class,
-                "Cannot safely cast",
-                () -> sql("UPDATE %s t SET t.s.n1 = 'str'", commitTarget()));
-
-            AssertHelpers.assertThrows(
-                "Should complain about writing incompatible structs",
-                AnalysisException.class,
-                "field name does not match",
-                () ->
-                    sql(
-                        "UPDATE %s t SET t.s.n2 = named_struct('dn2', 1, 
'dn1', 2)",
-                        commitTarget()));
+            Assertions.assertThatThrownBy(() -> sql("UPDATE %s t SET t.id = 
NULL", commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageStartingWith("Cannot write nullable values to 
non-null column");
+
+            Assertions.assertThatThrownBy(
+                    () -> sql("UPDATE %s t SET t.s.n1 = NULL", commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageStartingWith("Cannot write nullable values to 
non-null column");
+
+            Assertions.assertThatThrownBy(
+                    () -> sql("UPDATE %s t SET t.s = named_struct('n1', 1)", 
commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageStartingWith("Cannot write incompatible data:");
+
+            Assertions.assertThatThrownBy(
+                    () -> sql("UPDATE %s t SET t.s.n1 = 'str'", 
commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Cannot safely cast");
+
+            Assertions.assertThatThrownBy(
+                    () ->
+                        sql(
+                            "UPDATE %s t SET t.s.n2 = named_struct('dn3', 1, 
'dn1', 2)",
+                            commitTarget()))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageStartingWith("Cannot write incompatible data:");
           });
     }
   }
@@ -1407,22 +1392,20 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
   public void testUpdateWithNonDeterministicCondition() {
     createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
 
-    AssertHelpers.assertThrows(
-        "Should complain about non-deterministic expressions",
-        AnalysisException.class,
-        "nondeterministic expressions are only allowed",
-        () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5", 
commitTarget()));
+    Assertions.assertThatThrownBy(
+            () -> sql("UPDATE %s SET id = -1 WHERE id = 1 AND rand() > 0.5", 
commitTarget()))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            "nondeterministic expressions are only allowed in Project, Filter, 
Aggregate or Window");
   }
 
   @Test
   public void testUpdateOnNonIcebergTableNotSupported() {
     createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }");
 
-    AssertHelpers.assertThrows(
-        "UPDATE is not supported for non iceberg table",
-        UnsupportedOperationException.class,
-        "not supported temporarily",
-        () -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable"));
+    Assertions.assertThatThrownBy(() -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 
1", "testtable"))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("UPDATE TABLE is not supported temporarily.");
   }
 
   @Test

Reply via email to