This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a7a1ed99b Spark: Remove deprecated AssertHelpers usage (#7483)
8a7a1ed99b is described below

commit 8a7a1ed99b80a74c4f0730a93e91809ca2eaa322
Author: Liu Xiao <[email protected]>
AuthorDate: Tue May 2 20:43:05 2023 +0800

    Spark: Remove deprecated AssertHelpers usage (#7483)
---
 .../apache/iceberg/spark/TestFunctionCatalog.java  |  40 ++++----
 .../actions/TestDeleteReachableFilesAction.java    |  21 ++--
 .../spark/actions/TestExpireSnapshotsAction.java   |  20 ++--
 .../spark/actions/TestRemoveOrphanFilesAction.java |  58 +++++------
 .../spark/actions/TestRewriteDataFilesAction.java  | 114 ++++++++++-----------
 .../spark/actions/TestRewriteManifestsAction.java  |  12 +--
 .../vectorized/TestParquetVectorizedReads.java     |  36 +++----
 .../spark/source/TestDataFrameWriterV2.java        |  34 ++----
 .../spark/source/TestDataSourceOptions.java        |  79 +++++++-------
 .../spark/source/TestForwardCompatibility.java     |  29 +++---
 .../spark/source/TestIcebergSourceTablesBase.java  |  22 ++--
 .../TestMetadataTablesWithPartitionEvolution.java  |  10 +-
 .../TestRequiredDistributionAndOrdering.java       |  27 +++--
 .../iceberg/spark/source/TestSparkDataWrite.java   |  24 ++---
 .../spark/source/TestSparkMetadataColumns.java     |  19 ++--
 .../spark/source/TestStructuredStreamingRead3.java |  19 ++--
 .../spark/source/TestTimestampWithoutZone.java     |  67 +++++-------
 .../spark/source/TestWriteMetricsConfig.java       |  12 +--
 18 files changed, 287 insertions(+), 356 deletions(-)

diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
index 1db0fa41f7..be91c8d637 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.spark;
 
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.IcebergBuild;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.functions.IcebergVersionFunction;
@@ -68,11 +67,9 @@ public class TestFunctionCatalog extends 
SparkTestBaseWithCatalog {
         new Identifier[0],
         asFunctionCatalog.listFunctions(DEFAULT_NAMESPACE));
 
-    AssertHelpers.assertThrows(
-        "Listing functions in a namespace that does not exist should throw",
-        NoSuchNamespaceException.class,
-        "The schema `db` cannot be found",
-        () -> asFunctionCatalog.listFunctions(DB_NAMESPACE));
+    Assertions.assertThatThrownBy(() -> 
asFunctionCatalog.listFunctions(DB_NAMESPACE))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageStartingWith("[SCHEMA_NOT_FOUND] The schema `db` cannot be 
found.");
   }
 
   @Test
@@ -87,24 +84,23 @@ public class TestFunctionCatalog extends 
SparkTestBaseWithCatalog {
           .isExactlyInstanceOf(IcebergVersionFunction.class);
     }
 
-    AssertHelpers.assertThrows(
-        "Cannot load a function if it's not used with the system namespace or 
the empty namespace",
-        NoSuchFunctionException.class,
-        "The function default.iceberg_version cannot be found",
-        () -> asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE, 
"iceberg_version")));
+    Assertions.assertThatThrownBy(
+            () ->
+                
asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE, 
"iceberg_version")))
+        .isInstanceOf(NoSuchFunctionException.class)
+        .hasMessageStartingWith(
+            "[ROUTINE_NOT_FOUND] The function default.iceberg_version cannot 
be found.");
 
     Identifier undefinedFunction = Identifier.of(SYSTEM_NAMESPACE, 
"undefined_function");
-    AssertHelpers.assertThrows(
-        "Cannot load a function that does not exist",
-        NoSuchFunctionException.class,
-        "The function system.undefined_function cannot be found",
-        () -> asFunctionCatalog.loadFunction(undefinedFunction));
-
-    AssertHelpers.assertThrows(
-        "Using an undefined function from SQL should fail analysis",
-        AnalysisException.class,
-        "Cannot resolve function",
-        () -> sql("SELECT undefined_function(1, 2)"));
+    Assertions.assertThatThrownBy(() -> 
asFunctionCatalog.loadFunction(undefinedFunction))
+        .isInstanceOf(NoSuchFunctionException.class)
+        .hasMessageStartingWith(
+            "[ROUTINE_NOT_FOUND] The function system.undefined_function cannot 
be found.");
+
+    Assertions.assertThatThrownBy(() -> sql("SELECT undefined_function(1, 2)"))
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            "[UNRESOLVED_ROUTINE] Cannot resolve function `undefined_function` 
on search path");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
index 154e940519..18de71ce8e 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.StreamSupport;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
@@ -51,6 +50,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -366,22 +366,21 @@ public class TestDeleteReachableFilesAction extends 
SparkTestBase {
   public void testEmptyIOThrowsException() {
     DeleteReachableFiles baseRemoveFilesSparkAction =
         sparkActions().deleteReachableFiles(metadataLocation(table)).io(null);
-    AssertHelpers.assertThrows(
-        "FileIO can't be null in DeleteReachableFiles action",
-        IllegalArgumentException.class,
-        "File IO cannot be null",
-        baseRemoveFilesSparkAction::execute);
+
+    Assertions.assertThatThrownBy(baseRemoveFilesSparkAction::execute)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("File IO cannot be null");
   }
 
   @Test
   public void testRemoveFilesActionWhenGarbageCollectionDisabled() {
     table.updateProperties().set(TableProperties.GC_ENABLED, "false").commit();
 
-    AssertHelpers.assertThrows(
-        "Should complain about removing files when GC is disabled",
-        ValidationException.class,
-        "Cannot delete files: GC is disabled (deleting files may corrupt other 
tables)",
-        () -> 
sparkActions().deleteReachableFiles(metadataLocation(table)).execute());
+    Assertions.assertThatThrownBy(
+            () -> 
sparkActions().deleteReachableFiles(metadataLocation(table)).execute())
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot delete files: GC is disabled (deleting files may corrupt 
other tables)");
   }
 
   private String metadataLocation(Table tbl) {
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 25e6f6486a..e90626cbda 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -57,6 +56,7 @@ import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -447,11 +447,10 @@ public class TestExpireSnapshotsAction extends 
SparkTestBase {
 
     table.newAppend().appendFile(FILE_A).commit();
 
-    AssertHelpers.assertThrows(
-        "Should complain about expiring snapshots",
-        ValidationException.class,
-        "Cannot expire snapshots: GC is disabled",
-        () -> SparkActions.get().expireSnapshots(table));
+    Assertions.assertThatThrownBy(() -> 
SparkActions.get().expireSnapshots(table))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot expire snapshots: GC is disabled (deleting files may 
corrupt other tables)");
   }
 
   @Test
@@ -529,11 +528,10 @@ public class TestExpireSnapshotsAction extends 
SparkTestBase {
 
   @Test
   public void testRetainZeroSnapshots() {
-    AssertHelpers.assertThrows(
-        "Should fail retain 0 snapshots " + "because number of snapshots to 
retain cannot be zero",
-        IllegalArgumentException.class,
-        "Number of snapshots to retain must be at least 1, cannot be: 0",
-        () -> 
SparkActions.get().expireSnapshots(table).retainLast(0).execute());
+    Assertions.assertThatThrownBy(
+            () -> 
SparkActions.get().expireSnapshots(table).retainLast(0).execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Number of snapshots to retain must be at least 1, cannot 
be: 0");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index d91ac3606d..032ccfb14d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
@@ -751,11 +750,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
 
     table.updateProperties().set(TableProperties.GC_ENABLED, "false").commit();
 
-    AssertHelpers.assertThrows(
-        "Should complain about removing orphan files",
-        ValidationException.class,
-        "Cannot delete orphan files: GC is disabled",
-        () -> SparkActions.get().deleteOrphanFiles(table).execute());
+    Assertions.assertThatThrownBy(() -> 
SparkActions.get().deleteOrphanFiles(table).execute())
+        .isInstanceOf(ValidationException.class)
+        .hasMessage(
+            "Cannot delete orphan files: GC is disabled (deleting files may 
corrupt other tables)");
   }
 
   @Test
@@ -987,18 +985,18 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
   public void testPathsWithEqualSchemes() {
     List<String> validFiles = 
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
-    AssertHelpers.assertThrows(
-        "Test remove orphan files with equal schemes",
-        ValidationException.class,
-        "Conflicting authorities/schemes: [(scheme1, scheme2)]",
-        () ->
-            executeTest(
-                validFiles,
-                actualFiles,
-                Lists.newArrayList(),
-                ImmutableMap.of(),
-                ImmutableMap.of(),
-                DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+    Assertions.assertThatThrownBy(
+            () ->
+                executeTest(
+                    validFiles,
+                    actualFiles,
+                    Lists.newArrayList(),
+                    ImmutableMap.of(),
+                    ImmutableMap.of(),
+                    DeleteOrphanFiles.PrefixMismatchMode.ERROR))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Unable to determine whether certain files are 
orphan")
+        .hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, 
scheme2)].");
 
     Map<String, String> equalSchemes = Maps.newHashMap();
     equalSchemes.put("scheme1", "scheme");
@@ -1016,18 +1014,18 @@ public abstract class TestRemoveOrphanFilesAction 
extends SparkTestBase {
   public void testPathsWithEqualAuthorities() {
     List<String> validFiles = 
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
-    AssertHelpers.assertThrows(
-        "Test remove orphan files with equal authorities",
-        ValidationException.class,
-        "Conflicting authorities/schemes: [(servicename1, servicename2)]",
-        () ->
-            executeTest(
-                validFiles,
-                actualFiles,
-                Lists.newArrayList(),
-                ImmutableMap.of(),
-                ImmutableMap.of(),
-                DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+    Assertions.assertThatThrownBy(
+            () ->
+                executeTest(
+                    validFiles,
+                    actualFiles,
+                    Lists.newArrayList(),
+                    ImmutableMap.of(),
+                    ImmutableMap.of(),
+                    DeleteOrphanFiles.PrefixMismatchMode.ERROR))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith("Unable to determine whether certain files are 
orphan")
+        .hasMessageEndingWith("Conflicting authorities/schemes: 
[(servicename1, servicename2)].");
 
     Map<String, String> equalAuthorities = Maps.newHashMap();
     equalAuthorities.put("servicename1", "servicename");
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3ecd7ce371..a638776033 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -104,6 +103,7 @@ import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -647,10 +647,9 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .when(spyRewrite)
         .rewriteFiles(any(), argThat(failGroup));
 
-    AssertHelpers.assertThrows(
-        "Should fail entire rewrite if part fails",
-        RuntimeException.class,
-        () -> spyRewrite.execute());
+    Assertions.assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Rewrite Failed");
 
     table.refresh();
 
@@ -682,10 +681,9 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
 
-    AssertHelpers.assertThrows(
-        "Should fail entire rewrite if commit fails",
-        RuntimeException.class,
-        () -> spyRewrite.execute());
+    Assertions.assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Commit Failure");
 
     table.refresh();
 
@@ -718,10 +716,9 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .when(spyRewrite)
         .rewriteFiles(any(), argThat(failGroup));
 
-    AssertHelpers.assertThrows(
-        "Should fail entire rewrite if part fails",
-        RuntimeException.class,
-        () -> spyRewrite.execute());
+    Assertions.assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Rewrite Failed");
 
     table.refresh();
 
@@ -863,32 +860,35 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   public void testInvalidOptions() {
     Table table = createTable(20);
 
-    AssertHelpers.assertThrows(
-        "No negative values for partial progress max commits",
-        IllegalArgumentException.class,
-        () ->
-            basicRewrite(table)
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
-                .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "-5")
-                .execute());
-
-    AssertHelpers.assertThrows(
-        "No negative values for max concurrent groups",
-        IllegalArgumentException.class,
-        () ->
-            basicRewrite(table)
-                .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, 
"-5")
-                .execute());
-
-    AssertHelpers.assertThrows(
-        "No unknown options allowed",
-        IllegalArgumentException.class,
-        () -> basicRewrite(table).option("foobarity", "-5").execute());
-
-    AssertHelpers.assertThrows(
-        "Cannot set rewrite-job-order to foo",
-        IllegalArgumentException.class,
-        () -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, 
"foo").execute());
+    Assertions.assertThatThrownBy(
+            () ->
+                basicRewrite(table)
+                    .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
+                    .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, 
"-5")
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot set partial-progress.max-commits to -5, "
+                + "the value must be positive when partial-progress.enabled is 
true");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                basicRewrite(table)
+                    
.option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "-5")
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot set max-concurrent-file-group-rewrites to -5, the value 
must be positive.");
+
+    Assertions.assertThatThrownBy(() -> 
basicRewrite(table).option("foobarity", "-5").execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot use options [foobarity], they are not supported by the 
action or the rewriter BIN-PACK");
+
+    Assertions.assertThatThrownBy(
+            () -> 
basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid rewrite job order name: foo");
   }
 
   @Test
@@ -1124,10 +1124,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     
doReturn(util).when(spyAction).commitManager(table.currentSnapshot().snapshotId());
 
-    AssertHelpers.assertThrows(
-        "Should propagate CommitStateUnknown Exception",
-        CommitStateUnknownException.class,
-        () -> spyAction.execute());
+    Assertions.assertThatThrownBy(spyAction::execute)
+        .isInstanceOf(CommitStateUnknownException.class)
+        .hasMessageStartingWith(
+            "Unknown State\n" + "Cannot determine whether the commit was 
successful or not");
 
     List<Object[]> postRewriteData = currentData();
     assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
@@ -1243,23 +1243,17 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     SortOrder sortOrder = 
SortOrder.builderFor(table.schema()).asc("c2").build();
 
-    AssertHelpers.assertThrows(
-        "Should be unable to set Strategy more than once",
-        IllegalArgumentException.class,
-        "Must use only one rewriter type",
-        () -> actions().rewriteDataFiles(table).binPack().sort());
-
-    AssertHelpers.assertThrows(
-        "Should be unable to set Strategy more than once",
-        IllegalArgumentException.class,
-        "Must use only one rewriter type",
-        () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
-
-    AssertHelpers.assertThrows(
-        "Should be unable to set Strategy more than once",
-        IllegalArgumentException.class,
-        "Must use only one rewriter type",
-        () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
+    Assertions.assertThatThrownBy(() -> 
actions().rewriteDataFiles(table).binPack().sort())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Must use only one rewriter type (bin-pack, sort, 
zorder)");
+
+    Assertions.assertThatThrownBy(() -> 
actions().rewriteDataFiles(table).sort(sortOrder).binPack())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Must use only one rewriter type (bin-pack, sort, 
zorder)");
+
+    Assertions.assertThatThrownBy(() -> 
actions().rewriteDataFiles(table).sort(sortOrder).binPack())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Must use only one rewriter type (bin-pack, sort, 
zorder)");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4aafb72ace..64dbf42d4c 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
@@ -57,6 +56,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -207,11 +207,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Table spyTable = spy(table);
     when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
 
-    AssertHelpers.assertThrowsCause(
-        "Should throw a Commit State Unknown Exception",
-        RuntimeException.class,
-        "Datacenter on Fire",
-        () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> 
true).execute());
+    Assertions.assertThatThrownBy(
+            () -> actions.rewriteManifests(spyTable).rewriteIf(manifest -> 
true).execute())
+        .cause()
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage("Datacenter on Fire");
 
     table.refresh();
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 56e9490b99..5f86332120 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import org.apache.avro.generic.GenericData;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.CloseableIterable;
@@ -46,6 +45,7 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Ignore;
@@ -220,17 +220,16 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
   @Test
   @Override
   public void testNestedStruct() {
-    AssertHelpers.assertThrows(
-        "Vectorized reads are not supported yet for struct fields",
-        UnsupportedOperationException.class,
-        "Vectorized reads are not supported yet for struct fields",
-        () ->
-            VectorizedSparkParquetReaders.buildReader(
-                TypeUtil.assignIncreasingFreshIds(
-                    new Schema(required(1, "struct", SUPPORTED_PRIMITIVES))),
-                new MessageType(
-                    "struct", new GroupType(Type.Repetition.OPTIONAL, 
"struct").withId(1)),
-                false));
+    Assertions.assertThatThrownBy(
+            () ->
+                VectorizedSparkParquetReaders.buildReader(
+                    TypeUtil.assignIncreasingFreshIds(
+                        new Schema(required(1, "struct", 
SUPPORTED_PRIMITIVES))),
+                    new MessageType(
+                        "struct", new GroupType(Type.Repetition.OPTIONAL, 
"struct").withId(1)),
+                    false))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessage("Vectorized reads are not supported yet for struct 
fields");
   }
 
   @Test
@@ -350,13 +349,10 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
     try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema, 
dataFile)) {
       writer.addAll(data);
     }
-    AssertHelpers.assertThrows(
-        "Vectorized reads not supported",
-        UnsupportedOperationException.class,
-        "Cannot support vectorized reads for column",
-        () -> {
-          assertRecordsMatch(schema, 30000, data, dataFile, false, true, 
BATCH_SIZE);
-          return null;
-        });
+    Assertions.assertThatThrownBy(
+            () -> assertRecordsMatch(schema, 30000, data, dataFile, false, 
true, BATCH_SIZE))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageStartingWith("Cannot support vectorized reads for column")
+        .hasMessageEndingWith("Disable vectorized reads to read this 
table/file");
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
index a9b4f0d3ad..76b138ced7 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
@@ -19,7 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.List;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.spark.Spark3Util;
@@ -33,6 +32,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -76,18 +76,9 @@ public class TestDataFrameWriterV2 extends 
SparkTestBaseWithCatalog {
 
     // this has a different error message than the case without 
accept-any-schema because it uses
     // Iceberg checks
-    AssertHelpers.assertThrows(
-        "Should fail when merge-schema is not enabled on the writer",
-        IllegalArgumentException.class,
-        "Field new_col not found in source schema",
-        () -> {
-          try {
-            threeColDF.writeTo(tableName).append();
-          } catch (NoSuchTableException e) {
-            // needed because append has checked exceptions
-            throw new RuntimeException(e);
-          }
-        });
+    Assertions.assertThatThrownBy(() -> threeColDF.writeTo(tableName).append())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Field new_col not found in source schema");
   }
 
   @Test
@@ -111,18 +102,11 @@ public class TestDataFrameWriterV2 extends 
SparkTestBaseWithCatalog {
             "{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }",
             "{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }");
 
-    AssertHelpers.assertThrows(
-        "Should fail when accept-any-schema is not enabled on the table",
-        AnalysisException.class,
-        "too many data columns",
-        () -> {
-          try {
-            threeColDF.writeTo(tableName).option("merge-schema", 
"true").append();
-          } catch (NoSuchTableException e) {
-            // needed because append has checked exceptions
-            throw new RuntimeException(e);
-          }
-        });
+    Assertions.assertThatThrownBy(
+            () -> threeColDF.writeTo(tableName).option("merge-schema", 
"true").append())
+        .isInstanceOf(AnalysisException.class)
+        .hasMessageStartingWith(
+            "Cannot write to 'testhadoop.default.table', too many data 
columns");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..44400b5ad4 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
@@ -52,6 +51,7 @@ import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -242,50 +242,47 @@ public class TestDataSourceOptions {
     List<Long> snapshotIds = SnapshotUtil.currentAncestorIds(table);
 
     // start-snapshot-id and snapshot-id are both configured.
-    AssertHelpers.assertThrows(
-        "Check both start-snapshot-id and snapshot-id are configured",
-        IllegalArgumentException.class,
-        "Cannot set start-snapshot-id and end-snapshot-id for incremental 
scans",
-        () -> {
-          spark
-              .read()
-              .format("iceberg")
-              .option("snapshot-id", snapshotIds.get(3).toString())
-              .option("start-snapshot-id", snapshotIds.get(3).toString())
-              .load(tableLocation)
-              .explain();
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option("snapshot-id", snapshotIds.get(3).toString())
+                    .option("start-snapshot-id", snapshotIds.get(3).toString())
+                    .load(tableLocation)
+                    .explain())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot set start-snapshot-id and end-snapshot-id for incremental 
scans when either snapshot-id or as-of-timestamp is set");
 
     // end-snapshot-id and as-of-timestamp are both configured.
-    AssertHelpers.assertThrows(
-        "Check both start-snapshot-id and snapshot-id are configured",
-        IllegalArgumentException.class,
-        "Cannot set start-snapshot-id and end-snapshot-id for incremental 
scans",
-        () -> {
-          spark
-              .read()
-              .format("iceberg")
-              .option(
-                  SparkReadOptions.AS_OF_TIMESTAMP,
-                  
Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
-              .option("end-snapshot-id", snapshotIds.get(2).toString())
-              .load(tableLocation)
-              .explain();
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option(
+                        SparkReadOptions.AS_OF_TIMESTAMP,
+                        
Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
+                    .option("end-snapshot-id", snapshotIds.get(2).toString())
+                    .load(tableLocation)
+                    .explain())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot set start-snapshot-id and end-snapshot-id for incremental 
scans when either snapshot-id or as-of-timestamp is set");
 
     // only end-snapshot-id is configured.
-    AssertHelpers.assertThrows(
-        "Check both start-snapshot-id and snapshot-id are configured",
-        IllegalArgumentException.class,
-        "Cannot set only end-snapshot-id for incremental scans",
-        () -> {
-          spark
-              .read()
-              .format("iceberg")
-              .option("end-snapshot-id", snapshotIds.get(2).toString())
-              .load(tableLocation)
-              .explain();
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option("end-snapshot-id", snapshotIds.get(2).toString())
+                    .load(tableLocation)
+                    .explain())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(
+            "Cannot set only end-snapshot-id for incremental scans. Please, 
set start-snapshot-id too.");
 
     // test (1st snapshot, current snapshot] incremental scan.
     List<SimpleRecord> result =
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index fe44023590..73e572ecae 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 import org.apache.avro.generic.GenericData;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
@@ -56,6 +55,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.execution.streaming.MemoryStream;
 import org.apache.spark.sql.streaming.StreamingQuery;
 import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -116,16 +116,15 @@ public class TestForwardCompatibility {
 
     Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
 
-    AssertHelpers.assertThrows(
-        "Should reject write with unsupported transform",
-        UnsupportedOperationException.class,
-        "Cannot write using unsupported transforms: zero",
-        () ->
-            df.select("id", "data")
-                .write()
-                .format("iceberg")
-                .mode("append")
-                .save(location.toString()));
+    Assertions.assertThatThrownBy(
+            () ->
+                df.select("id", "data")
+                    .write()
+                    .format("iceberg")
+                    .mode("append")
+                    .save(location.toString()))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageEndingWith("Cannot write using unsupported transforms: 
zero");
   }
 
   @Test
@@ -155,11 +154,9 @@ public class TestForwardCompatibility {
     List<Integer> batch1 = Lists.newArrayList(1, 2);
     send(batch1, inputStream);
 
-    AssertHelpers.assertThrows(
-        "Should reject streaming write with unsupported transform",
-        StreamingQueryException.class,
-        "Cannot write using unsupported transforms: zero",
-        query::processAllAvailable);
+    Assertions.assertThatThrownBy(query::processAllAvailable)
+        .isInstanceOf(StreamingQueryException.class)
+        .hasMessageEndingWith("Cannot write using unsupported transforms: 
zero");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index fc023cdfdb..ac53f09511 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.ManifestFile;
@@ -1113,17 +1112,16 @@ public abstract class TestIcebergSourceTablesBase 
extends SparkTestBase {
 
     if (!spark.version().startsWith("2")) {
       // Spark 2 isn't able to actually push down nested struct projections so 
this will not break
-      AssertHelpers.assertThrows(
-          "Can't prune struct inside list",
-          SparkException.class,
-          "Cannot project a partial list element struct",
-          () ->
-              spark
-                  .read()
-                  .format("iceberg")
-                  .load(loadLocation(tableIdentifier, "manifests"))
-                  .select("partition_spec_id", "path", 
"partition_summaries.contains_null")
-                  .collectAsList());
+      Assertions.assertThatThrownBy(
+              () ->
+                  spark
+                      .read()
+                      .format("iceberg")
+                      .load(loadLocation(tableIdentifier, "manifests"))
+                      .select("partition_spec_id", "path", 
"partition_summaries.contains_null")
+                      .collectAsList())
+          .isInstanceOf(SparkException.class)
+          .hasMessageContaining("Cannot project a partial list element 
struct");
     }
 
     Dataset<Row> actualDf =
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 82c9a58e33..1b96245019 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -32,7 +32,6 @@ import static 
org.apache.iceberg.TableProperties.FORMAT_VERSION;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
@@ -53,6 +52,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -667,11 +667,9 @@ public class TestMetadataTablesWithPartitionEvolution 
extends SparkCatalogTestBa
     sql("REFRESH TABLE %s", tableName);
 
     for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES, 
ENTRIES, ALL_ENTRIES)) {
-      AssertHelpers.assertThrows(
-          "Should complain about the partition type",
-          ValidationException.class,
-          "Cannot build table partition type, unknown transforms",
-          () -> loadMetadataTable(tableType));
+      Assertions.assertThatThrownBy(() -> loadMetadataTable(tableType))
+          .isInstanceOf(ValidationException.class)
+          .hasMessage("Cannot build table partition type, unknown transforms: 
[zero]");
     }
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index ac481ca473..6c96a33a25 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.source;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -29,6 +28,7 @@ import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Test;
 
@@ -162,20 +162,17 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
     Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
 
     // should fail if ordering is disabled
-    AssertHelpers.assertThrowsCause(
-        "Should reject writes without ordering",
-        IllegalStateException.class,
-        "Incoming records violate the writer assumption",
-        () -> {
-          try {
-            inputDF
-                .writeTo(tableName)
-                .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
-                .append();
-          } catch (NoSuchTableException e) {
-            throw new RuntimeException(e);
-          }
-        });
+    Assertions.assertThatThrownBy(
+            () ->
+                inputDF
+                    .writeTo(tableName)
+                    
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+                    .append())
+        .cause()
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageStartingWith(
+            "Incoming records violate the writer assumption that records are 
clustered by spec "
+                + "and by partition within each spec. Either cluster the 
incoming records or switch to fanout writers.");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 991719d616..d6a235674d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
@@ -690,18 +689,17 @@ public class TestSparkDataWrite {
     ManualSource.setTable(manualTableName, sparkTable);
 
     // Although an exception is thrown here, write and commit have succeeded
-    AssertHelpers.assertThrows(
-        "Should throw a Commit State Unknown Exception",
-        CommitStateUnknownException.class,
-        "Datacenter on Fire",
-        () ->
-            df2.select("id", "data")
-                .sort("data")
-                .write()
-                .format("org.apache.iceberg.spark.source.ManualSource")
-                .option(ManualSource.TABLE_NAME, manualTableName)
-                .mode(SaveMode.Append)
-                .save(targetLocation));
+    Assertions.assertThatThrownBy(
+            () ->
+                df2.select("id", "data")
+                    .sort("data")
+                    .write()
+                    .format("org.apache.iceberg.spark.source.ManualSource")
+                    .option(ManualSource.TABLE_NAME, manualTableName)
+                    .mode(SaveMode.Append)
+                    .save(targetLocation))
+        .isInstanceOf(CommitStateUnknownException.class)
+        .hasMessageStartingWith("Datacenter on Fire");
 
     // Since write and commit succeeded, the rows should be readable
     Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index e399852285..1b46759590 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -29,7 +29,6 @@ import static org.apache.spark.sql.functions.lit;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataColumns;
@@ -52,6 +51,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -220,11 +220,9 @@ public class TestSparkMetadataColumns extends 
SparkTestBase {
     TableMetadata base = ops.current();
     ops.commit(base, base.updatePartitionSpec(UNKNOWN_SPEC));
 
-    AssertHelpers.assertThrows(
-        "Should fail to query the partition metadata column",
-        ValidationException.class,
-        "Cannot build table partition type, unknown transforms",
-        () -> sql("SELECT _partition FROM %s", TABLE_NAME));
+    Assertions.assertThatThrownBy(() -> sql("SELECT _partition FROM %s", 
TABLE_NAME))
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Cannot build table partition type, unknown transforms: 
[zero]");
   }
 
   @Test
@@ -242,11 +240,10 @@ public class TestSparkMetadataColumns extends 
SparkTestBase {
         ImmutableList.of(row(1L, "a1")),
         sql("SELECT id, category FROM %s", TABLE_NAME));
 
-    AssertHelpers.assertThrows(
-        "Should fail to query conflicting columns",
-        ValidationException.class,
-        "column names conflict",
-        () -> sql("SELECT * FROM %s", TABLE_NAME));
+    Assertions.assertThatThrownBy(() -> sql("SELECT * FROM %s", TABLE_NAME))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith(
+            "Table column names conflict with names reserved for Iceberg 
metadata columns: [_spec_id, _file].");
 
     table.refresh();
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index a7f18e1cb0..1c08744f56 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
@@ -492,11 +491,10 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     StreamingQuery query = startStream();
 
-    AssertHelpers.assertThrowsCause(
-        "Streaming should fail with IllegalStateException, as the snapshot is 
not of type APPEND",
-        IllegalStateException.class,
-        "Cannot process overwrite snapshot",
-        () -> query.processAllAvailable());
+    Assertions.assertThatThrownBy(query::processAllAvailable)
+        .cause()
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageStartingWith("Cannot process overwrite snapshot");
   }
 
   @Test
@@ -533,11 +531,10 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     StreamingQuery query = startStream();
 
-    AssertHelpers.assertThrowsCause(
-        "Streaming should fail with IllegalStateException, as the snapshot is 
not of type APPEND",
-        IllegalStateException.class,
-        "Cannot process delete snapshot",
-        () -> query.processAllAvailable());
+    Assertions.assertThatThrownBy(query::processAllAvailable)
+        .cause()
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageStartingWith("Cannot process delete snapshot");
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
index 053f6dbaea..428e6b5a7f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
@@ -53,6 +52,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -159,21 +159,17 @@ public class TestTimestampWithoutZone extends 
SparkTestBase {
 
   @Test
   public void testUnpartitionedTimestampWithoutZoneError() {
-    AssertHelpers.assertThrows(
-        String.format(
-            "Read operation performed on a timestamp without timezone field 
while "
-                + "'%s' set to false should throw exception",
-            SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE),
-        IllegalArgumentException.class,
-        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR,
-        () ->
-            spark
-                .read()
-                .format("iceberg")
-                .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
-                .option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, 
"false")
-                .load(unpartitioned.toString())
-                .collectAsList());
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
+                    
.option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false")
+                    .load(unpartitioned.toString())
+                    .collectAsList())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
   }
 
   @Test
@@ -198,30 +194,21 @@ public class TestTimestampWithoutZone extends 
SparkTestBase {
 
   @Test
   public void testUnpartitionedTimestampWithoutZoneWriteError() {
-    String errorMessage =
-        String.format(
-            "Write operation performed on a timestamp without timezone field 
while "
-                + "'%s' set to false should throw exception",
-            SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
-    Runnable writeOperation =
-        () ->
-            spark
-                .read()
-                .format("iceberg")
-                .option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, 
"true")
-                .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
-                .load(unpartitioned.toString())
-                .write()
-                .format("iceberg")
-                .option(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, 
"false")
-                .mode(SaveMode.Append)
-                .save(unpartitioned.toString());
-
-    AssertHelpers.assertThrows(
-        errorMessage,
-        IllegalArgumentException.class,
-        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR,
-        writeOperation);
+    Assertions.assertThatThrownBy(
+            () ->
+                spark
+                    .read()
+                    .format("iceberg")
+                    
.option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true")
+                    .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
+                    .load(unpartitioned.toString())
+                    .write()
+                    .format("iceberg")
+                    
.option(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false")
+                    .mode(SaveMode.Append)
+                    .save(unpartitioned.toString()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage(SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
   }
 
   @Test
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
index 9bf00f1b13..1e2a825d8e 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
@@ -49,6 +48,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -231,11 +231,11 @@ public class TestWriteMetricsConfig {
     properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
     properties.put("write.metadata.metrics.column.ids", "full");
 
-    AssertHelpers.assertThrows(
-        "Creating a table with invalid metrics should fail",
-        ValidationException.class,
-        null,
-        () -> tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation));
+    Assertions.assertThatThrownBy(
+            () -> tables.create(SIMPLE_SCHEMA, spec, properties, 
tableLocation))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageStartingWith(
+            "Invalid metrics config, could not find column ids from table prop 
write.metadata.metrics.column.ids in schema table");
   }
 
   @Test

Reply via email to