This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c157cbaf5 Spark 3.4: Migrate TestBase-related remaining tests in 
actions (#12579)
2c157cbaf5 is described below

commit 2c157cbaf552f48a18d959f0c6b95be36b9dd9e3
Author: Tom Tanaka <[email protected]>
AuthorDate: Fri Mar 21 18:50:50 2025 +0900

    Spark 3.4: Migrate TestBase-related remaining tests in actions (#12579)
---
 .../spark/actions/TestRemoveOrphanFilesAction.java | 268 +++++++-------
 .../actions/TestRemoveOrphanFilesAction3.java      | 107 +++---
 .../spark/actions/TestRewriteDataFilesAction.java  | 387 +++++++++++----------
 .../spark/actions/TestRewriteManifestsAction.java  | 228 ++++++------
 .../spark/actions/TestRemoveOrphanFilesAction.java |  26 +-
 .../actions/TestRemoveOrphanFilesAction3.java      |  20 +-
 .../spark/actions/TestRewriteDataFilesAction.java  |  15 +-
 .../spark/actions/TestRewriteManifestsAction.java  |   1 -
 8 files changed, 533 insertions(+), 519 deletions(-)

diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index d322f1d67b..8788940b09 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -32,12 +33,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +46,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -64,12 +68,10 @@ import org.apache.iceberg.puffin.Puffin;
 import org.apache.iceberg.puffin.PuffinWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSQLProperties;
-import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.TestBase;
 import 
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
 import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -80,13 +82,13 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
-public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class TestRemoveOrphanFilesAction extends TestBase {
 
   private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
   protected static final Schema SCHEMA =
@@ -97,17 +99,23 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
   protected static final PartitionSpec SPEC =
       PartitionSpec.builderFor(SCHEMA).truncate("c2", 
2).identity("c3").build();
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
-  private File tableDir = null;
+  @TempDir private File tableDir = null;
   protected String tableLocation = null;
+  protected Map<String, String> properties;
+  @Parameter private int formatVersion;
 
-  @Before
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Object> parameters() {
+    return Arrays.asList(2, 3);
+  }
+
+  @BeforeEach
   public void setupTableLocation() throws Exception {
-    this.tableDir = temp.newFolder();
     this.tableLocation = tableDir.toURI().toString();
+    properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
   }
 
-  @Test
+  @TestTemplate
   public void testDryRun() throws IOException, InterruptedException {
     Table table =
         TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), tableLocation);
@@ -129,7 +137,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .select("file_path")
             .as(Encoders.STRING())
             .collectAsList();
-    Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());
+    assertThat(validFiles).as("Should be 2 valid files").hasSize(2);
 
     df.write().mode("append").parquet(tableLocation + "/data");
 
@@ -140,11 +148,11 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .filter(FileStatus::isFile)
             .map(file -> file.getPath().toString())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should be 3 files", 3, allFiles.size());
+    assertThat(allFiles).as("Should be 3 valid files").hasSize(3);
 
     List<String> invalidFiles = Lists.newArrayList(allFiles);
     invalidFiles.removeAll(validFiles);
-    Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+    assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
 
     waitUntilAfter(System.currentTimeMillis());
 
@@ -152,9 +160,9 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
 
     DeleteOrphanFiles.Result result1 =
         actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
-    Assert.assertTrue(
-        "Default olderThan interval should be safe",
-        Iterables.isEmpty(result1.orphanFileLocations()));
+    assertThat(result1.orphanFileLocations())
+        .as("Default olderThan interval should be safe")
+        .isEmpty();
 
     DeleteOrphanFiles.Result result2 =
         actions
@@ -162,14 +170,21 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .olderThan(System.currentTimeMillis())
             .deleteWith(s -> {})
             .execute();
-    Assert.assertEquals("Action should find 1 file", invalidFiles, 
result2.orphanFileLocations());
-    Assert.assertTrue("Invalid file should be present", fs.exists(new 
Path(invalidFiles.get(0))));
+    assertThat(result2.orphanFileLocations())
+        .as("Action should find 1 file")
+        .isEqualTo(invalidFiles);
+    assertThat(fs.exists(new Path(invalidFiles.get(0))))
+        .as("Invalid file should be present")
+        .isTrue();
 
     DeleteOrphanFiles.Result result3 =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
-    Assert.assertEquals("Action should delete 1 file", invalidFiles, 
result3.orphanFileLocations());
-    Assert.assertFalse(
-        "Invalid file should not be present", fs.exists(new 
Path(invalidFiles.get(0))));
+    assertThat(result3.orphanFileLocations())
+        .as("Action should delete 1 file")
+        .isEqualTo(invalidFiles);
+    assertThat(fs.exists(new Path(invalidFiles.get(0))))
+        .as("Invalid file should not be present")
+        .isFalse();
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records);
@@ -178,10 +193,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testAllValidFilesAreKept() throws IOException, 
InterruptedException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
 
@@ -205,13 +220,13 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
 
     List<String> snapshotFiles1 = snapshotFiles(snapshots.get(0).snapshotId());
-    Assert.assertEquals(1, snapshotFiles1.size());
+    assertThat(snapshotFiles1).hasSize(1);
 
     List<String> snapshotFiles2 = snapshotFiles(snapshots.get(1).snapshotId());
-    Assert.assertEquals(1, snapshotFiles2.size());
+    assertThat(snapshotFiles2).hasSize(1);
 
     List<String> snapshotFiles3 = snapshotFiles(snapshots.get(2).snapshotId());
-    Assert.assertEquals(2, snapshotFiles3.size());
+    assertThat(snapshotFiles3).hasSize(2);
 
     df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
     df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA");
@@ -225,25 +240,25 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 4 files", 4, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete 4 
files").hasSize(4);
 
     Path dataPath = new Path(tableLocation + "/data");
     FileSystem fs = 
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
 
     for (String fileLocation : snapshotFiles1) {
-      Assert.assertTrue("All snapshot files must remain", fs.exists(new 
Path(fileLocation)));
+      assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files 
must remain").isTrue();
     }
 
     for (String fileLocation : snapshotFiles2) {
-      Assert.assertTrue("All snapshot files must remain", fs.exists(new 
Path(fileLocation)));
+      assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files 
must remain").isTrue();
     }
 
     for (String fileLocation : snapshotFiles3) {
-      Assert.assertTrue("All snapshot files must remain", fs.exists(new 
Path(fileLocation)));
+      assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files 
must remain").isTrue();
     }
   }
 
-  @Test
+  @TestTemplate
   public void orphanedFileRemovedWithParallelTasks() throws 
InterruptedException, IOException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
 
@@ -299,16 +314,15 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
 
     // Verifies that the delete methods ran in the threads created by the 
provided ExecutorService
     // ThreadFactory
-    Assert.assertEquals(
-        deleteThreads,
-        Sets.newHashSet(
-            "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3"));
-
-    Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+    assertThat(deleteThreads)
+        .containsExactlyInAnyOrder(
+            "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3");
+    assertThat(deletedFiles).hasSize(4);
   }
 
-  @Test
+  @TestTemplate
   public void testWapFilesAreKept() throws InterruptedException {
+    assumeThat(formatVersion).as("currently fails with DVs").isEqualTo(2);
     Map<String, String> props = Maps.newHashMap();
     props.put(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true");
     Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation);
@@ -328,7 +342,9 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Should not return data from the staged snapshot", 
records, actualRecords);
+    assertThat(actualRecords)
+        .as("Should not return data from the staged snapshot")
+        .isEqualTo(records);
 
     waitUntilAfter(System.currentTimeMillis());
 
@@ -337,11 +353,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertTrue(
-        "Should not delete any files", 
Iterables.isEmpty(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should not delete any 
files").isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testMetadataFolderIsIntact() throws InterruptedException {
     // write data directly to the table location
     Map<String, String> props = Maps.newHashMap();
@@ -363,15 +378,15 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete 1 
file").hasSize(1);
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", records, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(records);
   }
 
-  @Test
+  @TestTemplate
   public void testOlderThanTimestamp() throws InterruptedException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
 
@@ -397,11 +412,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         actions.deleteOrphanFiles(table).olderThan(timestamp).execute();
 
-    Assert.assertEquals(
-        "Should delete only 2 files", 2, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete only 2 
files").hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveUnreachableMetadataVersionFiles() throws 
InterruptedException {
     Map<String, String> props = Maps.newHashMap();
     props.put(TableProperties.WRITE_DATA_LOCATION, tableLocation);
@@ -423,11 +437,8 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.orphanFileLocations()));
-    Assert.assertTrue(
-        "Should remove v1 file",
-        StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
-            .anyMatch(file -> file.contains("v1.metadata.json")));
+    assertThat(result.orphanFileLocations())
+        .containsExactly(tableLocation + "metadata/v1.metadata.json");
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records);
@@ -436,10 +447,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testManyTopLevelPartitions() throws InterruptedException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
 
@@ -459,14 +470,13 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertTrue(
-        "Should not delete any files", 
Iterables.isEmpty(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should not delete any 
files").isEmpty();
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
-    Assert.assertEquals("Rows count must match", records.size(), 
resultDF.count());
+    assertThat(resultDF.count()).as("Rows count must 
match").isEqualTo(records.size());
   }
 
-  @Test
+  @TestTemplate
   public void testManyLeafPartitions() throws InterruptedException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
 
@@ -486,14 +496,13 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertTrue(
-        "Should not delete any files", 
Iterables.isEmpty(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should not delete any 
files").isEmpty();
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
-    Assert.assertEquals("Row count must match", records.size(), 
resultDF.count());
+    assertThat(resultDF.count()).as("Row count must 
match").isEqualTo(records.size());
   }
 
-  @Test
+  @TestTemplate
   public void testHiddenPartitionPaths() throws InterruptedException {
     Schema schema =
         new Schema(
@@ -523,10 +532,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 2 files", 2, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete 2 
files").hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testHiddenPartitionPathsWithPartitionEvolution() throws 
InterruptedException {
     Schema schema =
         new Schema(
@@ -559,10 +568,10 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 2 files", 2, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete 2 
files").hasSize(2);
   }
 
-  @Test
+  @TestTemplate
   public void testHiddenPathsStartingWithPartitionNamesAreIgnored()
       throws InterruptedException, IOException {
     Schema schema =
@@ -595,8 +604,8 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 0 files", 0, 
Iterables.size(result.orphanFileLocations()));
-    Assert.assertTrue(fs.exists(pathToFileInHiddenFolder));
+    assertThat(result.orphanFileLocations()).as("Should delete 0 
files").isEmpty();
+    assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
   }
 
   private List<String> snapshotFiles(long snapshotId) {
@@ -610,7 +619,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
         .collectAsList();
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException, 
InterruptedException {
     Table table =
         TABLES.create(
@@ -635,7 +644,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .select("file_path")
             .as(Encoders.STRING())
             .collectAsList();
-    Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+    assertThat(validFiles).as("Should be 1 valid file").hasSize(1);
     String validFile = validFiles.get(0);
 
     df.write().mode("append").parquet(tableLocation + "/data");
@@ -647,11 +656,11 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .filter(FileStatus::isFile)
             .map(file -> file.getPath().toString())
             .collect(Collectors.toList());
-    Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+    assertThat(allFiles).as("Should be 2 files").hasSize(2);
 
     List<String> invalidFiles = Lists.newArrayList(allFiles);
     invalidFiles.removeIf(file -> file.contains(validFile));
-    Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+    assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
 
     waitUntilAfter(System.currentTimeMillis());
 
@@ -662,11 +671,15 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .olderThan(System.currentTimeMillis())
             .deleteWith(s -> {})
             .execute();
-    Assert.assertEquals("Action should find 1 file", invalidFiles, 
result.orphanFileLocations());
-    Assert.assertTrue("Invalid file should be present", fs.exists(new 
Path(invalidFiles.get(0))));
+    assertThat(result.orphanFileLocations())
+        .as("Action should find 1 file")
+        .isEqualTo(invalidFiles);
+    assertThat(fs.exists(new Path(invalidFiles.get(0))))
+        .as("Invalid file should be present")
+        .isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveOrphanFilesWithHadoopCatalog() throws 
InterruptedException {
     HadoopCatalog catalog = new HadoopCatalog(new Configuration(), 
tableLocation);
     String namespaceName = "testDb";
@@ -693,24 +706,18 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     DeleteOrphanFiles.Result result =
         
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals(
-        "Should delete only 1 files", 1, 
Iterables.size(result.orphanFileLocations()));
+    assertThat(result.orphanFileLocations()).as("Should delete only 1 
file").hasSize(1);
 
     Dataset<Row> resultDF = 
spark.read().format("iceberg").load(table.location());
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", records, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(records);
   }
 
-  @Test
+  @TestTemplate
   public void testHiveCatalogTable() throws IOException {
-    Table table =
-        catalog.createTable(
-            TableIdentifier.of("default", "hivetestorphan"),
-            SCHEMA,
-            SPEC,
-            tableLocation,
-            Maps.newHashMap());
+    TableIdentifier identifier = TableIdentifier.of("default", 
randomName("hivetestorphan"));
+    Table table = catalog.createTable(identifier, SCHEMA, SPEC, tableLocation, 
properties);
 
     List<ThreeColumnRecord> records =
         Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -721,7 +728,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
         .write()
         .format("iceberg")
         .mode("append")
-        .save("default.hivetestorphan");
+        .save(identifier.toString());
 
     String location = table.location().replaceFirst("file:", "");
     new File(location + "/data/trashfile").createNewFile();
@@ -731,13 +738,12 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .deleteOrphanFiles(table)
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(result.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + "/data/trashfile");
   }
 
-  @Test
+  @TestTemplate
   public void testGarbageCollectionDisabled() {
     Table table =
         TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), tableLocation);
@@ -757,7 +763,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             "Cannot delete orphan files: GC is disabled (deleting files may 
corrupt other tables)");
   }
 
-  @Test
+  @TestTemplate
   public void testCompareToFileList() throws IOException, InterruptedException 
{
     Table table =
         TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), tableLocation);
@@ -782,7 +788,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
                         file.getPath().toString(), new 
Timestamp(file.getModificationTime())))
             .collect(Collectors.toList());
 
-    Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());
+    assertThat(validFiles).as("Should be 2 valid files").hasSize(2);
 
     df.write().mode("append").parquet(tableLocation + "/data");
 
@@ -795,7 +801,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
                         file.getPath().toString(), new 
Timestamp(file.getModificationTime())))
             .collect(Collectors.toList());
 
-    Assert.assertEquals("Should be 3 files", 3, allFiles.size());
+    assertThat(allFiles).as("Should be 3 files").hasSize(3);
 
     List<FilePathLastModifiedRecord> invalidFiles = 
Lists.newArrayList(allFiles);
     invalidFiles.removeAll(validFiles);
@@ -803,7 +809,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
         invalidFiles.stream()
             .map(FilePathLastModifiedRecord::getFilePath)
             .collect(Collectors.toList());
-    Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+    assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
 
     // sleep for 1 second to ensure files will be old enough
     waitUntilAfter(System.currentTimeMillis());
@@ -822,9 +828,9 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .compareToFileList(compareToFileList)
             .deleteWith(s -> {})
             .execute();
-    Assert.assertTrue(
-        "Default olderThan interval should be safe",
-        Iterables.isEmpty(result1.orphanFileLocations()));
+    assertThat(result1.orphanFileLocations())
+        .as("Default olderThan interval should be safe")
+        .isEmpty();
 
     DeleteOrphanFiles.Result result2 =
         actions
@@ -833,10 +839,12 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .olderThan(System.currentTimeMillis())
             .deleteWith(s -> {})
             .execute();
-    Assert.assertEquals(
-        "Action should find 1 file", invalidFilePaths, 
result2.orphanFileLocations());
-    Assert.assertTrue(
-        "Invalid file should be present", fs.exists(new 
Path(invalidFilePaths.get(0))));
+    assertThat(result2.orphanFileLocations())
+        .as("Action should find 1 file")
+        .isEqualTo(invalidFilePaths);
+    assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
+        .as("Invalid file should be present")
+        .isTrue();
 
     DeleteOrphanFiles.Result result3 =
         actions
@@ -844,10 +852,12 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .compareToFileList(compareToFileList)
             .olderThan(System.currentTimeMillis())
             .execute();
-    Assert.assertEquals(
-        "Action should delete 1 file", invalidFilePaths, 
result3.orphanFileLocations());
-    Assert.assertFalse(
-        "Invalid file should not be present", fs.exists(new 
Path(invalidFilePaths.get(0))));
+    assertThat(result3.orphanFileLocations())
+        .as("Action should delete 1 file")
+        .isEqualTo(invalidFilePaths);
+    assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
+        .as("Invalid file should not be present")
+        .isFalse();
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records);
@@ -856,7 +866,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
     List<ThreeColumnRecord> actualRecords =
         resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
 
     List<FilePathLastModifiedRecord> outsideLocationMockFiles =
         Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new 
Timestamp(0L)));
@@ -873,8 +883,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .compareToFileList(compareToFileListWithOutsideLocation)
             .deleteWith(s -> {})
             .execute();
-    Assert.assertEquals(
-        "Action should find nothing", Lists.newArrayList(), 
result4.orphanFileLocations());
+    assertThat(result4.orphanFileLocations()).as("Action should find 
nothing").isEmpty();
   }
 
   protected long waitUntilAfter(long timestampMillis) {
@@ -885,7 +894,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     return current;
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
     Table table =
         TABLES.create(
@@ -954,35 +963,32 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
     Iterable<String> orphanFileLocations = result.orphanFileLocations();
-    assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1);
-    assertThat(Iterables.getOnlyElement(orphanFileLocations))
-        .as("Deleted file")
-        .isEqualTo(statsLocation.toURI().toString());
-    assertThat(statsLocation.exists()).as("stats file should be 
deleted").isFalse();
+    
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+    assertThat(statsLocation).as("stats file should be 
deleted").doesNotExist();
   }
 
-  @Test
+  @TestTemplate
   public void testPathsWithExtraSlashes() {
     List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("file:///dir1/////dir2///file1");
     executeTest(validFiles, actualFiles, Lists.newArrayList());
   }
 
-  @Test
+  @TestTemplate
   public void testPathsWithValidFileHavingNoAuthority() {
     List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
     executeTest(validFiles, actualFiles, Lists.newArrayList());
   }
 
-  @Test
+  @TestTemplate
   public void testPathsWithActualFileHavingNoAuthority() {
     List<String> validFiles = 
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
     List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
     executeTest(validFiles, actualFiles, Lists.newArrayList());
   }
 
-  @Test
+  @TestTemplate
   public void testPathsWithEqualSchemes() {
     List<String> validFiles = 
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
@@ -1011,7 +1017,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
         DeleteOrphanFiles.PrefixMismatchMode.ERROR);
   }
 
-  @Test
+  @TestTemplate
   public void testPathsWithEqualAuthorities() {
     List<String> validFiles = 
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
@@ -1040,7 +1046,7 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
         DeleteOrphanFiles.PrefixMismatchMode.ERROR);
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveOrphanFileActionWithDeleteMode() {
     List<String> validFiles = 
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
     List<String> actualFiles = 
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
@@ -1054,6 +1060,10 @@ public abstract class TestRemoveOrphanFilesAction 
extends SparkTestBase {
         DeleteOrphanFiles.PrefixMismatchMode.DELETE);
   }
 
+  protected String randomName(String prefix) {
+    return prefix + UUID.randomUUID().toString().replace("-", "");
+  }
+
   private void executeTest(
       List<String> validFiles, List<String> actualFiles, List<String> 
expectedOrphanFiles) {
     executeTest(
@@ -1081,6 +1091,6 @@ public abstract class TestRemoveOrphanFilesAction extends 
SparkTestBase {
     List<String> orphanFiles =
         DeleteOrphanFilesSparkAction.findOrphanFiles(
             spark, toFileUri.apply(actualFileDS), 
toFileUri.apply(validFileDS), mode);
-    Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+    assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 0abfd79d5d..646e5f8e70 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -18,23 +18,21 @@
  */
 package org.apache.iceberg.spark.actions;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
-import java.util.Map;
-import java.util.stream.StreamSupport;
 import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkCatalog;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.expressions.Transform;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
 
 public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
-  @Test
+  @TestTemplate
   public void testSparkCatalogTable() throws Exception {
     spark.conf().set("spark.sql.catalog.mycat", 
"org.apache.iceberg.spark.SparkCatalog");
     spark.conf().set("spark.sql.catalog.mycat.type", "hadoop");
@@ -42,29 +40,28 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
     SparkCatalog cat = (SparkCatalog) 
spark.sessionState().catalogManager().catalog("mycat");
 
     String[] database = {"default"};
-    Identifier id = Identifier.of(database, "table");
-    Map<String, String> options = Maps.newHashMap();
+    Identifier id = Identifier.of(database, randomName("table"));
     Transform[] transforms = {};
-    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO mycat.default.table VALUES (1,1,1)");
+    sql("INSERT INTO mycat.default.%s VALUES (1,1,1)", id.name());
 
     String location = table.table().location().replaceFirst("file:", "");
-    new File(location + "/data/trashfile").createNewFile();
+    String trashFile = randomName("/data/trashfile");
+    new File(location + trashFile).createNewFile();
 
     DeleteOrphanFiles.Result results =
         SparkActions.get()
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(results.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + trashFile);
   }
 
-  @Test
+  @TestTemplate
   public void testSparkCatalogNamedHadoopTable() throws Exception {
     spark.conf().set("spark.sql.catalog.hadoop", 
"org.apache.iceberg.spark.SparkCatalog");
     spark.conf().set("spark.sql.catalog.hadoop.type", "hadoop");
@@ -72,29 +69,28 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
     SparkCatalog cat = (SparkCatalog) 
spark.sessionState().catalogManager().catalog("hadoop");
 
     String[] database = {"default"};
-    Identifier id = Identifier.of(database, "table");
-    Map<String, String> options = Maps.newHashMap();
+    Identifier id = Identifier.of(database, randomName("table"));
     Transform[] transforms = {};
-    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO hadoop.default.table VALUES (1,1,1)");
+    sql("INSERT INTO hadoop.default.%s VALUES (1,1,1)", id.name());
 
     String location = table.table().location().replaceFirst("file:", "");
-    new File(location + "/data/trashfile").createNewFile();
+    String trashFile = randomName("/data/trashfile");
+    new File(location + trashFile).createNewFile();
 
     DeleteOrphanFiles.Result results =
         SparkActions.get()
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(results.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + trashFile);
   }
 
-  @Test
+  @TestTemplate
   public void testSparkCatalogNamedHiveTable() throws Exception {
     spark.conf().set("spark.sql.catalog.hive", 
"org.apache.iceberg.spark.SparkCatalog");
     spark.conf().set("spark.sql.catalog.hive.type", "hadoop");
@@ -102,29 +98,28 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
     SparkCatalog cat = (SparkCatalog) 
spark.sessionState().catalogManager().catalog("hive");
 
     String[] database = {"default"};
-    Identifier id = Identifier.of(database, "table");
-    Map<String, String> options = Maps.newHashMap();
+    Identifier id = Identifier.of(database, randomName("table"));
     Transform[] transforms = {};
-    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO hive.default.table VALUES (1,1,1)");
+    sql("INSERT INTO hive.default.%s VALUES (1,1,1)", id.name());
 
     String location = table.table().location().replaceFirst("file:", "");
-    new File(location + "/data/trashfile").createNewFile();
+    String trashFile = randomName("/data/trashfile");
+    new File(location + trashFile).createNewFile();
 
     DeleteOrphanFiles.Result results =
         SparkActions.get()
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(results.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + trashFile);
   }
 
-  @Test
+  @TestTemplate
   public void testSparkSessionCatalogHadoopTable() throws Exception {
     spark
         .conf()
@@ -135,29 +130,28 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
         (SparkSessionCatalog) 
spark.sessionState().catalogManager().v2SessionCatalog();
 
     String[] database = {"default"};
-    Identifier id = Identifier.of(database, "table");
-    Map<String, String> options = Maps.newHashMap();
+    Identifier id = Identifier.of(database, randomName("table"));
     Transform[] transforms = {};
-    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO default.table VALUES (1,1,1)");
+    sql("INSERT INTO default.%s VALUES (1,1,1)", id.name());
 
     String location = table.table().location().replaceFirst("file:", "");
-    new File(location + "/data/trashfile").createNewFile();
+    String trashFile = randomName("/data/trashfile");
+    new File(location + trashFile).createNewFile();
 
     DeleteOrphanFiles.Result results =
         SparkActions.get()
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(results.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + trashFile);
   }
 
-  @Test
+  @TestTemplate
   public void testSparkSessionCatalogHiveTable() throws Exception {
     spark
         .conf()
@@ -168,30 +162,29 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
 
     String[] database = {"default"};
     Identifier id = Identifier.of(database, "sessioncattest");
-    Map<String, String> options = Maps.newHashMap();
     Transform[] transforms = {};
     cat.dropTable(id);
-    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+    cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
+    sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
 
     String location = table.table().location().replaceFirst("file:", "");
-    new File(location + "/data/trashfile").createNewFile();
+    String trashFile = randomName("/data/trashfile");
+    new File(location + trashFile).createNewFile();
 
     DeleteOrphanFiles.Result results =
         SparkActions.get()
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    Assert.assertTrue(
-        "trash file should be removed",
-        StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false)
-            .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile")));
+    assertThat(results.orphanFileLocations())
+        .as("trash file should be removed")
+        .contains("file:" + location + trashFile);
   }
 
-  @After
-  public void resetSparkSessionCatalog() throws Exception {
+  @AfterEach
+  public void resetSparkSessionCatalog() {
     spark.conf().unset("spark.sql.catalog.spark_catalog");
     spark.conf().unset("spark.sql.catalog.spark_catalog.type");
     spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 490c711930..43e5cb37da 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -38,6 +38,7 @@ import static org.mockito.Mockito.spy;
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -55,6 +56,9 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
@@ -90,7 +94,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -100,8 +103,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBase;
 import 
org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -116,17 +119,18 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.internal.SQLConf;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 
-public class TestRewriteDataFilesAction extends SparkTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRewriteDataFilesAction extends TestBase {
 
+  @TempDir private File tableDir;
   private static final int SCALE = 400000;
 
   private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
@@ -138,21 +142,25 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
   private static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @Parameter private int formatVersion;
+
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Object> parameters() {
+    return Arrays.asList(2, 3);
+  }
 
   private final FileRewriteCoordinator coordinator = 
FileRewriteCoordinator.get();
   private final ScanTaskSetManager manager = ScanTaskSetManager.get();
   private String tableLocation = null;
 
-  @BeforeClass
+  @BeforeAll
   public static void setupSpark() {
     // disable AQE as tests assume that writes generate a particular number of 
files
     spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
   }
 
-  @Before
-  public void setupTableLocation() throws Exception {
-    File tableDir = temp.newFolder();
+  @BeforeEach
+  public void setupTableLocation() {
     this.tableLocation = tableDir.toURI().toString();
   }
 
@@ -162,20 +170,20 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     return 
actions().rewriteDataFiles(table).option(SizeBasedFileRewriter.MIN_INPUT_FILES, 
"1");
   }
 
-  @Test
+  @TestTemplate
   public void testEmptyTable() {
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
 
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
 
     basicRewrite(table).execute();
 
-    Assert.assertNull("Table must stay empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must stay empty").isNull();
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackUnpartitionedTable() {
     Table table = createTable(4);
     shouldHaveFiles(table, 4);
@@ -183,8 +191,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     long dataSizeBefore = testDataSize(table);
 
     Result result = basicRewrite(table).execute();
-    Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 4 data files")
+        .isEqualTo(4);
+    assertThat(result.addedDataFilesCount()).as("Action should add 1 data 
file").isOne();
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 1);
@@ -193,7 +203,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actual);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackPartitionedTable() {
     Table table = createTablePartitioned(4, 2);
     shouldHaveFiles(table, 8);
@@ -201,8 +211,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     long dataSizeBefore = testDataSize(table);
 
     Result result = basicRewrite(table).execute();
-    Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 8 data files")
+        .isEqualTo(8);
+    assertThat(result.addedDataFilesCount()).as("Action should add 4 data 
file").isEqualTo(4);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
@@ -211,7 +223,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackWithFilter() {
     Table table = createTablePartitioned(4, 2);
     shouldHaveFiles(table, 8);
@@ -224,8 +236,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .filter(Expressions.startsWith("c2", "foo"))
             .execute();
 
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 2 data files")
+        .isEqualTo(2);
+    assertThat(result.addedDataFilesCount()).as("Action should add 1 data 
file").isOne();
     
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     shouldHaveFiles(table, 7);
@@ -234,7 +248,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackWithFilterOnBucketExpression() {
     Table table = createTablePartitioned(4, 2);
 
@@ -260,7 +274,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackAfterPartitionChange() {
     Table table = createTable();
 
@@ -282,10 +296,9 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 Integer.toString(averageFileSize(table) + 1001))
             .execute();
 
-    Assert.assertEquals(
-        "Should have 1 fileGroup because all files were not correctly 
partitioned",
-        1,
-        result.rewriteResults().size());
+    assertThat(result.rewriteResults())
+        .as("Should have 1 fileGroup because all files were not correctly 
partitioned")
+        .hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> postRewriteData = currentData();
@@ -296,7 +309,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveFiles(table, 20);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackWithDeletes() {
     Table table = createTablePartitioned(4, 2);
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
@@ -331,15 +344,15 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, 
Long.toString(Long.MAX_VALUE))
             .option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "2")
             .execute();
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount()).isEqualTo(2);
     
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Rows must match", expectedRecords, actualRecords);
-    Assert.assertEquals("7 rows are removed", total - 7, actualRecords.size());
+    assertThat(actualRecords).hasSize(total - 7);
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveDangledEqualityDeletesPartitionEvolution() {
     Table table =
         TABLES.create(
@@ -398,7 +411,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveFiles(table, 5);
   }
 
-  @Test
+  @TestTemplate
   public void testRemoveDangledPositionDeletesPartitionEvolution() {
     Table table =
         TABLES.create(
@@ -437,11 +450,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .containsExactly(1, 2, 1);
     shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 
1", 3);
     shouldHaveSnapshots(table, 5);
-    
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+    
assertThat(table.currentSnapshot().summary()).containsEntry("total-position-deletes",
 "0");
     assertEquals("Rows must match", expectedRecords, currentData());
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackWithDeleteAllData() {
     Map<String, String> options = Maps.newHashMap();
     options.put(TableProperties.FORMAT_VERSION, "2");
@@ -466,26 +479,25 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .rewriteDataFiles(table)
             .option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "1")
             .execute();
-    Assert.assertEquals("Action should rewrite 1 data files", 1, 
result.rewrittenDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount()).as("Action should rewrite 1 
data files").isOne();
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Rows must match", expectedRecords, actualRecords);
-    Assert.assertEquals(
-        "Data manifest should not have existing data file",
-        0,
-        (long) 
table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
-    Assert.assertEquals(
-        "Data manifest should have 1 delete data file",
-        1L,
-        (long) 
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
-    Assert.assertEquals(
-        "Delete manifest added row count should equal total count",
-        total,
-        (long) 
table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
-  }
-
-  @Test
+    
assertThat(table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount())
+        .as("Data manifest should not have existing data file")
+        .isZero();
+
+    assertThat((long) 
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount())
+        .as("Data manifest should have 1 delete data file")
+        .isEqualTo(1L);
+
+    
assertThat(table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount())
+        .as("Delete manifest added row count should equal total count")
+        .isEqualTo(total);
+  }
+
+  @TestTemplate
   public void testBinPackWithStartingSequenceNumber() {
     Table table = createTablePartitioned(4, 2);
     shouldHaveFiles(table, 8);
@@ -497,8 +509,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     Result result =
         
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, 
"true").execute();
-    Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 8 data files")
+        .isEqualTo(8);
+    assertThat(result.addedDataFilesCount()).as("Action should add 4 data 
files").isEqualTo(4);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
@@ -506,20 +520,21 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
 
     table.refresh();
-    Assert.assertTrue(
-        "Table sequence number should be incremented",
-        oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+    assertThat(table.currentSnapshot().sequenceNumber())
+        .as("Table sequence number should be incremented")
+        .isGreaterThan(oldSequenceNumber);
 
     Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, 
MetadataTableType.ENTRIES);
     for (Row row : rows.collectAsList()) {
       if (row.getInt(0) == 1) {
-        Assert.assertEquals(
-            "Expect old sequence number for added entries", oldSequenceNumber, 
row.getLong(2));
+        assertThat(row.getLong(2))
+            .as("Expect old sequence number for added entries")
+            .isEqualTo(oldSequenceNumber);
       }
     }
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackWithStartingSequenceNumberV1Compatibility() {
     Map<String, String> properties = 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
     Table table = createTablePartitioned(4, 2, SCALE, properties);
@@ -527,13 +542,15 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     List<Object[]> expectedRecords = currentData();
     table.refresh();
     long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
-    Assert.assertEquals("Table sequence number should be 0", 0, 
oldSequenceNumber);
+    assertThat(oldSequenceNumber).as("Table sequence number should be 
0").isZero();
     long dataSizeBefore = testDataSize(table);
 
     Result result =
         
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, 
"true").execute();
-    Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 8 data files")
+        .isEqualTo(8);
+    assertThat(result.addedDataFilesCount()).as("Action should add 4 data 
files").isEqualTo(4);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
@@ -541,19 +558,19 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
 
     table.refresh();
-    Assert.assertEquals(
-        "Table sequence number should still be 0",
-        oldSequenceNumber,
-        table.currentSnapshot().sequenceNumber());
+    assertThat(table.currentSnapshot().sequenceNumber())
+        .as("Table sequence number should still be 0")
+        .isEqualTo(oldSequenceNumber);
 
     Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, 
MetadataTableType.ENTRIES);
     for (Row row : rows.collectAsList()) {
-      Assert.assertEquals(
-          "Expect sequence number 0 for all entries", oldSequenceNumber, 
row.getLong(2));
+      assertThat(row.getLong(2))
+          .as("Expect sequence number 0 for all entries")
+          .isEqualTo(oldSequenceNumber);
     }
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteLargeTableHasResiduals() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
     Map<String, String> options = Maps.newHashMap();
@@ -575,15 +592,19 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     CloseableIterable<FileScanTask> tasks =
         table.newScan().ignoreResiduals().filter(Expressions.equal("c3", 
"0")).planFiles();
     for (FileScanTask task : tasks) {
-      Assert.assertEquals("Residuals must be ignored", 
Expressions.alwaysTrue(), task.residual());
+      assertThat(task.residual())
+          .as("Residuals must be ignored")
+          .isEqualTo(Expressions.alwaysTrue());
     }
 
     shouldHaveFiles(table, 2);
 
     long dataSizeBefore = testDataSize(table);
     Result result = basicRewrite(table).filter(Expressions.equal("c3", 
"0")).execute();
-    Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should rewrite 2 data files")
+        .isEqualTo(2);
+    assertThat(result.addedDataFilesCount()).as("Action should add 1 data 
file").isOne();
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
@@ -591,7 +612,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackSplitLargeFile() {
     Table table = createTable(1);
     shouldHaveFiles(table, 1);
@@ -606,8 +627,8 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, 
Long.toString(targetSize * 2 - 2000))
             .execute();
 
-    Assert.assertEquals("Action should delete 1 data files", 1, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 2 data files", 2, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount()).as("Action should delete 1 
data files").isOne();
+    assertThat(result.addedDataFilesCount()).as("Action should add 2 data 
files").isEqualTo(2);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 2);
@@ -616,7 +637,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackCombineMixedFiles() {
     Table table = createTable(1); // 400000
     shouldHaveFiles(table, 1);
@@ -638,10 +659,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, 
Integer.toString(targetSize - 1000))
             .execute();
 
-    Assert.assertEquals("Action should delete 3 data files", 3, 
result.rewrittenDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should delete 3 data files")
+        .isEqualTo(3);
     // Should Split the big files into 3 pieces, one of which should be 
combined with the two
     // smaller files
-    Assert.assertEquals("Action should add 3 data files", 3, 
result.addedDataFilesCount());
+    assertThat(result.addedDataFilesCount()).as("Action should add 3 data 
files").isEqualTo(3);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 3);
@@ -650,7 +673,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackCombineMediumFiles() {
     Table table = createTable(4);
     shouldHaveFiles(table, 4);
@@ -671,8 +694,10 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 Integer.toString(targetSize - 100)) // All files too small
             .execute();
 
-    Assert.assertEquals("Action should delete 4 data files", 4, 
result.rewrittenDataFilesCount());
-    Assert.assertEquals("Action should add 3 data files", 3, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenDataFilesCount())
+        .as("Action should delete 4 data files")
+        .isEqualTo(4);
+    assertThat(result.addedDataFilesCount()).as("Action should add 3 data 
files").isEqualTo(3);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 3);
@@ -681,7 +706,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testPartialProgressEnabled() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -700,7 +725,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "10")
             .execute();
 
-    Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewriteResults()).as("Should have 10 
fileGroups").hasSize(10);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -712,7 +737,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
   }
 
-  @Test
+  @TestTemplate
   public void testMultipleGroups() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -728,7 +753,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
             .execute();
 
-    Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewriteResults()).as("Should have 10 
fileGroups").hasSize(10);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -740,7 +765,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testPartialProgressMaxCommits() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -757,7 +782,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3")
             .execute();
 
-    Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewriteResults()).as("Should have 10 
fileGroups").hasSize(10);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -769,7 +794,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testSingleCommitWithRewriteFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -803,7 +828,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testSingleCommitWithCommitFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -837,7 +862,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testCommitFailsWithUncleanableFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -871,7 +896,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testParallelSingleCommitWithRewriteFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -906,7 +931,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testPartialProgressWithRewriteFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -948,7 +973,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testParallelPartialProgressWithRewriteFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -991,7 +1016,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testParallelPartialProgressWithCommitFailure() {
     Table table = createTable(20);
     int fileSize = averageFileSize(table);
@@ -1021,8 +1046,8 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     RewriteDataFiles.Result result = spyRewrite.execute();
 
-    // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10 
total groups comitted
-    Assert.assertEquals("Should have 6 fileGroups", 6, 
result.rewriteResults().size());
+    // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10 
total groups committed
+    assertThat(result.rewriteResults()).as("Should have 6 
fileGroups").hasSize(6);
     
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();
@@ -1036,7 +1061,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testInvalidOptions() {
     Table table = createTable(20);
 
@@ -1080,7 +1105,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .hasMessageContaining("requires enabling Iceberg Spark session 
extensions");
   }
 
-  @Test
+  @TestTemplate
   public void testSortMultipleGroups() {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
@@ -1100,7 +1125,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, 
Integer.toString(fileSize * 2 + 1000))
             .execute();
 
-    Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewriteResults()).as("Should have 10 
fileGroups").hasSize(10);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -1112,7 +1137,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testSimpleSort() {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
@@ -1131,7 +1156,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Integer.toString(averageFileSize(table)))
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -1145,7 +1170,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitSorted(table, "c2");
   }
 
-  @Test
+  @TestTemplate
   public void testSortAfterPartitionChange() {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
@@ -1165,10 +1190,9 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Integer.toString(averageFileSize(table)))
             .execute();
 
-    Assert.assertEquals(
-        "Should have 1 fileGroup because all files were not correctly 
partitioned",
-        result.rewriteResults().size(),
-        1);
+    assertThat(result.rewriteResults())
+        .as("Should have 1 fileGroups because all files were not correctly 
partitioned")
+        .hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -1182,7 +1206,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitSorted(table, "c2");
   }
 
-  @Test
+  @TestTemplate
   public void testSortCustomSortOrder() {
     Table table = createTable(20);
     shouldHaveLastCommitUnsorted(table, "c2");
@@ -1199,7 +1223,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Integer.toString(averageFileSize(table)))
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -1213,7 +1237,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitSorted(table, "c2");
   }
 
-  @Test
+  @TestTemplate
   public void testSortCustomSortOrderRequiresRepartition() {
     int partitions = 4;
     Table table = createTable();
@@ -1238,7 +1262,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
                 Integer.toString(averageFileSize(table) / partitions))
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
@@ -1253,7 +1277,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitSorted(table, "c3");
   }
 
-  @Test
+  @TestTemplate
   public void testAutoSortShuffleOutput() {
     Table table = createTable(20);
     shouldHaveLastCommitUnsorted(table, "c2");
@@ -1275,11 +1299,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    Assert.assertTrue(
-        "Should have written 40+ files",
-        Iterables.size(table.currentSnapshot().addedDataFiles(table.io())) >= 
40);
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
+    assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+        .as("Should have written 40+ files")
+        .hasSizeGreaterThanOrEqualTo(40);
 
     table.refresh();
 
@@ -1292,7 +1317,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitSorted(table, "c2");
   }
 
-  @Test
+  @TestTemplate
   public void testCommitStateUnknownException() {
     Table table = createTable(20);
     shouldHaveFiles(table, 20);
@@ -1324,7 +1349,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveSnapshots(table, 2); // Commit actually Succeeded
   }
 
-  @Test
+  @TestTemplate
   public void testZOrderSort() {
     int originalFiles = 20;
     Table table = createTable(originalFiles);
@@ -1337,8 +1362,8 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     double originalFilesC2C3 =
         percentFilesRequired(table, new String[] {"c2", "c3"}, new String[] 
{"foo23", "bar23"});
 
-    Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 
0.99);
-    Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 
0.99);
+    assertThat(originalFilesC2).as("Should require all files to scan 
c2").isGreaterThan(0.99);
+    assertThat(originalFilesC3).as("Should require all files to scan 
c3").isGreaterThan(0.99);
 
     long dataSizeBefore = testDataSize(table);
     RewriteDataFiles.Result result =
@@ -1354,10 +1379,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 1, 
result.rewriteResults().size());
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    int zOrderedFilesTotal = 
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
-    Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 
40);
+    assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+        .as("Should have written 40+ files")
+        .hasSizeGreaterThanOrEqualTo(40);
 
     table.refresh();
 
@@ -1372,18 +1398,18 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     double filesScannedC2C3 =
         percentFilesRequired(table, new String[] {"c2", "c3"}, new String[] 
{"foo23", "bar23"});
 
-    Assert.assertTrue(
-        "Should have reduced the number of files required for c2",
-        filesScannedC2 < originalFilesC2);
-    Assert.assertTrue(
-        "Should have reduced the number of files required for c3",
-        filesScannedC3 < originalFilesC3);
-    Assert.assertTrue(
-        "Should have reduced the number of files required for a c2,c3 
predicate",
-        filesScannedC2C3 < originalFilesC2C3);
+    assertThat(originalFilesC2)
+        .as("Should have reduced the number of files required for c2")
+        .isGreaterThan(filesScannedC2);
+    assertThat(originalFilesC3)
+        .as("Should have reduced the number of files required for c3")
+        .isGreaterThan(filesScannedC3);
+    assertThat(originalFilesC2C3)
+        .as("Should have reduced the number of files required for c2,c3 
predicate")
+        .isGreaterThan(filesScannedC2C3);
   }
 
-  @Test
+  @TestTemplate
   public void testZOrderAllTypesSort() {
     Table table = createTypeTestTable();
     shouldHaveFiles(table, 10);
@@ -1410,10 +1436,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
             .execute();
 
-    Assert.assertEquals("Should have 1 fileGroups", 1, 
result.rewriteResults().size());
+    assertThat(result.rewriteResults()).as("Should have 1 
fileGroups").hasSize(1);
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
     int zOrderedFilesTotal = 
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
-    Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal);
+    assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+        .as("Should have written 1 file")
+        .hasSize(1);
 
     table.refresh();
 
@@ -1426,7 +1454,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
-  @Test
+  @TestTemplate
   public void testInvalidAPIUsage() {
     Table table = createTable(1);
 
@@ -1445,7 +1473,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .hasMessage("Must use only one rewriter type (bin-pack, sort, 
zorder)");
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteJobOrderBytesAsc() {
     Table table = createTablePartitioned(4, 2);
     writeRecords(1, SCALE, 1);
@@ -1472,12 +1500,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .collect(Collectors.toList());
 
     expected.sort(Comparator.naturalOrder());
-    Assert.assertEquals("Size in bytes order should be ascending", actual, 
expected);
+    assertThat(actual).as("Size in bytes order should be 
ascending").isEqualTo(expected);
     Collections.reverse(expected);
-    Assert.assertNotEquals("Size in bytes order should not be descending", 
actual, expected);
+    assertThat(actual).as("Size in bytes order should not be 
descending").isNotEqualTo(expected);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteJobOrderBytesDesc() {
     Table table = createTablePartitioned(4, 2);
     writeRecords(1, SCALE, 1);
@@ -1504,12 +1532,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .collect(Collectors.toList());
 
     expected.sort(Comparator.reverseOrder());
-    Assert.assertEquals("Size in bytes order should be descending", actual, 
expected);
+    assertThat(actual).as("Size in bytes order should be 
descending").isEqualTo(expected);
     Collections.reverse(expected);
-    Assert.assertNotEquals("Size in bytes order should not be ascending", 
actual, expected);
+    assertThat(actual).as("Size in bytes order should not be 
ascending").isNotEqualTo(expected);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteJobOrderFilesAsc() {
     Table table = createTablePartitioned(4, 2);
     writeRecords(1, SCALE, 1);
@@ -1536,12 +1564,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .collect(Collectors.toList());
 
     expected.sort(Comparator.naturalOrder());
-    Assert.assertEquals("Number of files order should be ascending", actual, 
expected);
+    assertThat(actual).as("Number of files order should be 
ascending").isEqualTo(expected);
     Collections.reverse(expected);
-    Assert.assertNotEquals("Number of files order should not be descending", 
actual, expected);
+    assertThat(actual).as("Number of files order should not be 
descending").isNotEqualTo(expected);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteJobOrderFilesDesc() {
     Table table = createTablePartitioned(4, 2);
     writeRecords(1, SCALE, 1);
@@ -1568,12 +1596,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .collect(Collectors.toList());
 
     expected.sort(Comparator.reverseOrder());
-    Assert.assertEquals("Number of files order should be descending", actual, 
expected);
+    assertThat(actual).as("Number of files order should be 
descending").isEqualTo(expected);
     Collections.reverse(expected);
-    Assert.assertNotEquals("Number of files order should not be ascending", 
actual, expected);
+    assertThat(actual).as("Number of files order should not be 
ascending").isNotEqualTo(expected);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
     Table table = createTable(10);
     shouldHaveFiles(table, 10);
@@ -1591,11 +1619,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
-  @Test
+  @TestTemplate
   public void testBinPackRewriterWithSpecificOutputSpec() {
     Table table = createTable(10);
     shouldHaveFiles(table, 10);
@@ -1614,11 +1642,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
-  @Test
+  @TestTemplate
   public void testBinpackRewriteWithInvalidOutputSpecId() {
     Table table = createTable(10);
     shouldHaveFiles(table, 10);
@@ -1634,7 +1662,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             "Cannot use output spec id 1234 because the table does not contain 
a reference to this spec-id.");
   }
 
-  @Test
+  @TestTemplate
   public void testSortRewriterWithSpecificOutputSpecId() {
     Table table = createTable(10);
     shouldHaveFiles(table, 10);
@@ -1653,11 +1681,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
-  @Test
+  @TestTemplate
   public void testZOrderRewriteWithSpecificOutputSpecId() {
     Table table = createTable(10);
     shouldHaveFiles(table, 10);
@@ -1676,7 +1704,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
@@ -1718,13 +1746,15 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   protected void shouldHaveMultipleFiles(Table table) {
     table.refresh();
     int numFiles = Iterables.size(table.newScan().planFiles());
-    Assert.assertTrue(String.format("Should have multiple files, had %d", 
numFiles), numFiles > 1);
+    assertThat(numFiles)
+        .as(String.format("Should have multiple files, had %d", numFiles))
+        .isGreaterThan(1);
   }
 
   protected void shouldHaveFiles(Table table, int numExpected) {
     table.refresh();
     int numFiles = Iterables.size(table.newScan().planFiles());
-    Assert.assertEquals("Did not have the expected number of files", 
numExpected, numFiles);
+    assertThat(numFiles).as("Did not have the expected number of 
files").isEqualTo(numExpected);
   }
 
   protected long shouldHaveMinSequenceNumberInPartition(
@@ -1744,20 +1774,20 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
   protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
     table.refresh();
-    int actualSnapshots = Iterables.size(table.snapshots());
-    Assert.assertEquals(
-        "Table did not have the expected number of snapshots", 
expectedSnapshots, actualSnapshots);
+    assertThat(table.snapshots())
+        .as("Table did not have the expected number of snapshots")
+        .hasSize(expectedSnapshots);
   }
 
   protected void shouldHaveNoOrphans(Table table) {
-    Assert.assertEquals(
-        "Should not have found any orphan files",
-        ImmutableList.of(),
-        actions()
-            .deleteOrphanFiles(table)
-            .olderThan(System.currentTimeMillis())
-            .execute()
-            .orphanFileLocations());
+    assertThat(
+            actions()
+                .deleteOrphanFiles(table)
+                .olderThan(System.currentTimeMillis())
+                .execute()
+                .orphanFileLocations())
+        .as("Should not have found any orphan files")
+        .isEmpty();
   }
 
   protected void shouldHaveOrphans(Table table) {
@@ -1772,20 +1802,19 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   }
 
   protected void shouldHaveACleanCache(Table table) {
-    Assert.assertEquals(
-        "Should not have any entries in cache", ImmutableSet.of(), 
cacheContents(table));
+    assertThat(cacheContents(table)).as("Should not have any entries in 
cache").isEmpty();
   }
 
   protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = 
checkForOverlappingFiles(table, column);
 
-    Assert.assertEquals("Found overlapping files", Collections.emptyList(), 
overlappingFiles);
+    assertThat(overlappingFiles).as("Found overlapping files").isEmpty();
   }
 
   protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = 
checkForOverlappingFiles(table, column);
 
-    Assert.assertNotEquals("Found no overlapping files", 
Collections.emptyList(), overlappingFiles);
+    assertThat(overlappingFiles).as("Found no overlapping files").isNotEmpty();
   }
 
   private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> 
javaClass) {
@@ -1864,7 +1893,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         .updateProperties()
         .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(20 
* 1024))
         .commit();
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
     return table;
   }
 
@@ -1884,7 +1913,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
       int partitions, int files, int numRecords, Map<String, String> options) {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
 
     writeRecords(files, numRecords, partitions);
     return table;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index bdc830e946..8a98a11704 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -46,6 +47,9 @@ import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
@@ -66,8 +70,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBase;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
@@ -76,17 +80,13 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.TableIdentifier;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class TestRewriteManifestsAction extends SparkTestBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRewriteManifestsAction extends TestBase {
 
   private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
   private static final Schema SCHEMA =
@@ -98,36 +98,37 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
   @Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1}, 
formatVersion = {2}")
   public static Object[] parameters() {
     return new Object[][] {
-      new Object[] {"true", "true", 1},
-      new Object[] {"false", "true", 1},
-      new Object[] {"true", "false", 2},
-      new Object[] {"false", "false", 2}
+      new Object[] {"true", "true", false, 1},
+      new Object[] {"false", "true", true, 1},
+      new Object[] {"true", "false", false, 2},
+      new Object[] {"false", "false", false, 2},
+      new Object[] {"true", "false", false, 3},
+      new Object[] {"false", "false", false, 3}
     };
   }
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @Parameter private String snapshotIdInheritanceEnabled;
+
+  @Parameter(index = 1)
+  private String useCaching;
+
+  @Parameter(index = 2)
+  private boolean shouldStageManifests;
+
+  @Parameter(index = 3)
+  private int formatVersion;
 
-  private final String snapshotIdInheritanceEnabled;
-  private final String useCaching;
-  private final int formatVersion;
-  private final boolean shouldStageManifests;
   private String tableLocation = null;
 
-  public TestRewriteManifestsAction(
-      String snapshotIdInheritanceEnabled, String useCaching, int 
formatVersion) {
-    this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
-    this.useCaching = useCaching;
-    this.formatVersion = formatVersion;
-    this.shouldStageManifests = formatVersion == 1 && 
snapshotIdInheritanceEnabled.equals("false");
-  }
+  @TempDir private Path temp;
+  @TempDir private File tableDir;
 
-  @Before
+  @BeforeEach
   public void setupTableLocation() throws Exception {
-    File tableDir = temp.newFolder();
     this.tableLocation = tableDir.toURI().toString();
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteManifestsEmptyTable() throws IOException {
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
@@ -135,7 +136,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
 
-    Assert.assertNull("Table must be empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
 
     SparkActions actions = SparkActions.get();
 
@@ -143,13 +144,13 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         .rewriteManifests(table)
         .rewriteIf(manifest -> true)
         .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-        .stagingLocation(temp.newFolder().toString())
+        .stagingLocation(java.nio.file.Files.createTempDirectory(temp, 
"junit").toString())
         .execute();
 
-    Assert.assertNull("Table must stay empty", table.currentSnapshot());
+    assertThat(table.currentSnapshot()).as("Table must stay empty").isNull();
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteSmallManifestsNonPartitionedTable() {
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
@@ -171,7 +172,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     table.refresh();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+    assertThat(manifests).as("Should have 2 manifests before 
rewrite").hasSize(2);
 
     SparkActions actions = SparkActions.get();
 
@@ -182,20 +183,18 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
-    Assert.assertEquals(
-        "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
-    Assert.assertEquals(
-        "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertThat(result.rewrittenManifests()).as("Action should rewrite 2 
manifests").hasSize(2);
+    assertThat(result.addedManifests()).as("Action should add 1 
manifests").hasSize(1);
     assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+    assertThat(newManifests).as("Should have 1 manifests after 
rewrite").hasSize(1);
 
-    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
-    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
-    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+    assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+    assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+    assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records1);
@@ -205,10 +204,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> actualRecords =
         resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
 
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteManifestsWithCommitStateUnknownException() {
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
@@ -230,7 +229,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     table.refresh();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+    assertThat(manifests).as("Should have 2 manifests before 
rewrite").hasSize(2);
 
     SparkActions actions = SparkActions.get();
 
@@ -258,11 +257,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     // table should reflect the changes, since the commit was successful
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+    assertThat(newManifests).as("Should have 1 manifests after 
rewrite").hasSize(1);
 
-    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
-    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
-    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+    assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+    assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+    assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records1);
@@ -272,10 +271,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> actualRecords =
         resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
 
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteSmallManifestsPartitionedTable() {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
     Map<String, String> options = Maps.newHashMap();
@@ -309,7 +308,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     table.refresh();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 4 manifests before rewrite", 4, 
manifests.size());
+    assertThat(manifests).as("Should have 4 manifests before 
rewrite").hasSize(4);
 
     SparkActions actions = SparkActions.get();
 
@@ -329,24 +328,22 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
-    Assert.assertEquals(
-        "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
-    Assert.assertEquals(
-        "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertThat(result.rewrittenManifests()).as("Action should rewrite 4 
manifests").hasSize(4);
+    assertThat(result.addedManifests()).as("Action should add 2 
manifests").hasSize(2);
     assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests after rewrite", 2, 
newManifests.size());
+    assertThat(newManifests).as("Should have 2 manifests after 
rewrite").hasSize(2);
 
-    Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
-    Assert.assertFalse(newManifests.get(0).hasAddedFiles());
-    Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+    assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+    assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+    assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
 
-    Assert.assertEquals(4, (long) newManifests.get(1).existingFilesCount());
-    Assert.assertFalse(newManifests.get(1).hasAddedFiles());
-    Assert.assertFalse(newManifests.get(1).hasDeletedFiles());
+    assertThat(newManifests.get(1).existingFilesCount()).isEqualTo(4);
+    assertThat(newManifests.get(1).hasAddedFiles()).isFalse();
+    assertThat(newManifests.get(1).hasDeletedFiles()).isFalse();
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records1);
@@ -358,10 +355,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> actualRecords =
         resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
 
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteImportedManifests() throws IOException {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
     Map<String, String> options = Maps.newHashMap();
@@ -372,7 +369,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> records =
         Lists.newArrayList(
             new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, 
"BBBBBBBBBB", "BBBB"));
-    File parquetTableDir = temp.newFolder("parquet_table");
+    File parquetTableDir = temp.resolve("parquet_table").toFile();
     String parquetTableLocation = parquetTableDir.toURI().toString();
 
     try {
@@ -386,7 +383,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           .partitionBy("c3")
           .saveAsTable("parquet_table");
 
-      File stagingDir = temp.newFolder("staging-dir");
+      File stagingDir = temp.resolve("staging-dir").toFile();
       SparkTableUtil.importSparkTable(
           spark, new TableIdentifier("parquet_table"), table, 
stagingDir.toString());
 
@@ -398,7 +395,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
       SparkActions actions = SparkActions.get();
 
-      String rewriteStagingLocation = temp.newFolder().toString();
+      String rewriteStagingLocation =
+          java.nio.file.Files.createTempDirectory(temp, "junit").toString();
 
       RewriteManifests.Result result =
           actions
@@ -408,12 +406,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
               .stagingLocation(rewriteStagingLocation)
               .execute();
 
-      Assert.assertEquals(
-          "Action should rewrite all manifests",
-          snapshot.allManifests(table.io()),
-          result.rewrittenManifests());
-      Assert.assertEquals(
-          "Action should add 1 manifest", 1, 
Iterables.size(result.addedManifests()));
+      assertThat(result.rewrittenManifests())
+          .as("Action should rewrite all manifests")
+          .isEqualTo(snapshot.allManifests(table.io()));
+      assertThat(result.addedManifests()).as("Action should add 1 
manifest").hasSize(1);
       assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
 
     } finally {
@@ -421,7 +417,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteLargeManifestsPartitionedTable() throws IOException {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
     Map<String, String> options = Maps.newHashMap();
@@ -437,7 +433,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     table.newFastAppend().appendManifest(appendManifest).commit();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 1 manifests before rewrite", 1, 
manifests.size());
+    assertThat(manifests).as("Should have 1 manifests before 
rewrite").hasSize(1);
 
     // set the target manifest size to a small value to force splitting 
records into multiple files
     table
@@ -449,7 +445,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
-    String stagingLocation = temp.newFolder().toString();
+    String stagingLocation = java.nio.file.Files.createTempDirectory(temp, 
"junit").toString();
 
     RewriteManifests.Result result =
         actions
@@ -469,7 +465,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteManifestsWithPredicate() throws IOException {
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
     Map<String, String> options = Maps.newHashMap();
@@ -493,11 +489,11 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     table.refresh();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 3 manifests before rewrite", 3, 
manifests.size());
+    assertThat(manifests).as("Should have 3 manifests before 
rewrite").hasSize(3);
 
     SparkActions actions = SparkActions.get();
 
-    String stagingLocation = temp.newFolder().toString();
+    String stagingLocation = java.nio.file.Files.createTempDirectory(temp, 
"junit").toString();
 
     // rewrite only the first manifest
     RewriteManifests.Result result =
@@ -511,22 +507,22 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
-    Assert.assertEquals(
-        "Action should rewrite 2 manifest", 2, 
Iterables.size(result.rewrittenManifests()));
-    Assert.assertEquals(
-        "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertThat(result.rewrittenManifests()).as("Action should rewrite 2 
manifest").hasSize(2);
+    assertThat(result.addedManifests()).as("Action should add 1 
manifests").hasSize(1);
     assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests after rewrite", 2, 
newManifests.size());
-
-    Assert.assertFalse("First manifest must be rewritten", 
newManifests.contains(manifests.get(0)));
-    Assert.assertFalse(
-        "Second manifest must be rewritten", 
newManifests.contains(manifests.get(1)));
-    Assert.assertTrue(
-        "Third manifest must not be rewritten", 
newManifests.contains(manifests.get(2)));
+    assertThat(newManifests)
+        .as("Should have 2 manifests after rewrite")
+        .hasSize(2)
+        .as("First manifest must be rewritten")
+        .doesNotContain(manifests.get(0))
+        .as("Second manifest must be rewritten")
+        .doesNotContain(manifests.get(1))
+        .as("Third manifest must not be rewritten")
+        .contains(manifests.get(2));
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.add(records1.get(0));
@@ -539,10 +535,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> actualRecords =
         resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
 
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteSmallManifestsNonPartitionedV2Table() {
     assumeThat(formatVersion).isGreaterThan(1);
 
@@ -567,7 +563,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     DataFile file2 = 
Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io()));
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests before rewrite", 2, 
manifests.size());
+    assertThat(manifests).as("Should have 2 manifests before 
rewrite").hasSize(2);
 
     SparkActions actions = SparkActions.get();
     RewriteManifests.Result result =
@@ -575,21 +571,19 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
             .rewriteManifests(table)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
-    Assert.assertEquals(
-        "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
-    Assert.assertEquals(
-        "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertThat(result.rewrittenManifests()).as("Action should rewrite 2 
manifests").hasSize(2);
+    assertThat(result.addedManifests()).as("Action should add 1 
manifests").hasSize(1);
     assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 1 manifests after rewrite", 1, 
newManifests.size());
+    assertThat(newManifests).as("Should have 1 manifests after 
rewrite").hasSize(1);
 
     ManifestFile newManifest = Iterables.getOnlyElement(newManifests);
-    Assert.assertEquals(2, (long) newManifest.existingFilesCount());
-    Assert.assertFalse(newManifest.hasAddedFiles());
-    Assert.assertFalse(newManifest.hasDeletedFiles());
+    assertThat(newManifest.existingFilesCount()).isEqualTo(2);
+    assertThat(newManifest.hasAddedFiles()).isFalse();
+    assertThat(newManifest.hasDeletedFiles()).isFalse();
 
     validateDataManifest(
         table,
@@ -607,10 +601,10 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     List<ThreeColumnRecord> actualRecords =
         resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
 
-    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+    assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws 
IOException {
     assumeThat(formatVersion).isEqualTo(1);
 
@@ -659,9 +653,9 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws 
IOException {
-    assumeThat(formatVersion).isGreaterThan(1);
+    assumeThat(formatVersion).isEqualTo(2);
 
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
@@ -732,9 +726,9 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     assertThat(actualRecords()).isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteSmallDeleteManifestsPartitionedTable() throws 
IOException {
-    assumeThat(formatVersion).isGreaterThan(1);
+    assumeThat(formatVersion).isEqualTo(2);
 
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
     Map<String, String> options = Maps.newHashMap();
@@ -835,9 +829,9 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     assertThat(actualRecords()).isEqualTo(expectedRecords);
   }
 
-  @Test
+  @TestTemplate
   public void testRewriteLargeDeleteManifestsPartitionedTable() throws 
IOException {
-    assumeThat(formatVersion).isGreaterThan(1);
+    assumeThat(formatVersion).isEqualTo(2);
 
     PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
     Map<String, String> options = Maps.newHashMap();
@@ -874,7 +868,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
-    String stagingLocation = temp.newFolder().toString();
+    String stagingLocation = java.nio.file.Files.createTempDirectory(temp, 
"junit").toString();
 
     RewriteManifests.Result result =
         actions
@@ -948,8 +942,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
   }
 
   private ManifestFile writeManifest(Table table, List<DataFile> files) throws 
IOException {
-    File manifestFile = temp.newFile("generated-manifest.avro");
-    Assert.assertTrue(manifestFile.delete());
+    File manifestFile = File.createTempFile("generated-manifest", ".avro", 
temp.toFile());
+    assertThat(manifestFile.delete()).isTrue();
     OutputFile outputFile = 
table.io().newOutputFile(manifestFile.getCanonicalPath());
 
     ManifestWriter<DataFile> writer =
@@ -1018,7 +1012,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
   private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
       Table table, StructLike partition, List<Pair<CharSequence, Long>> 
deletes)
       throws IOException {
-    OutputFile outputFile = Files.localOutput(temp.newFile());
+    OutputFile outputFile = Files.localOutput(File.createTempFile("junit", 
null, temp.toFile()));
     return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes);
   }
 
@@ -1036,7 +1030,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
       deletes.add(delete.copy(key, value));
     }
 
-    OutputFile outputFile = Files.localOutput(temp.newFile());
+    OutputFile outputFile = Files.localOutput(File.createTempFile("junit", 
null, temp.toFile()));
     return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes, 
deleteSchema);
   }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 022dfa1592..d3508283dd 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -69,10 +68,8 @@ import org.apache.iceberg.puffin.Puffin;
 import org.apache.iceberg.puffin.PuffinWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.iceberg.spark.TestBase;
 import 
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
@@ -317,10 +314,8 @@ public abstract class TestRemoveOrphanFilesAction extends 
TestBase {
     // Verifies that the delete methods ran in the threads created by the 
provided ExecutorService
     // ThreadFactory
     assertThat(deleteThreads)
-        .isEqualTo(
-            Sets.newHashSet(
-                "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3"));
-
+        .containsExactlyInAnyOrder(
+            "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3");
     assertThat(deletedFiles).hasSize(4);
   }
 
@@ -446,10 +441,8 @@ public abstract class TestRemoveOrphanFilesAction extends 
TestBase {
     DeleteOrphanFiles.Result result =
         
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    assertThat(result.orphanFileLocations()).as("Should delete 1 
file").hasSize(1);
-    
assertThat(StreamSupport.stream(result.orphanFileLocations().spliterator(), 
false))
-        .as("Should remove v1 file")
-        .anyMatch(file -> file.contains("v1.metadata.json"));
+    assertThat(result.orphanFileLocations())
+        .containsExactly(tableLocation + "metadata/v1.metadata.json");
 
     List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
     expectedRecords.addAll(records);
@@ -748,9 +741,9 @@ public abstract class TestRemoveOrphanFilesAction extends 
TestBase {
             .deleteOrphanFiles(table)
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    
assertThat(StreamSupport.stream(result.orphanFileLocations().spliterator(), 
false))
+    assertThat(result.orphanFileLocations())
         .as("trash file should be removed")
-        .anyMatch(file -> file.contains("file:" + location + 
"/data/trashfile"));
+        .contains("file:" + location + "/data/trashfile");
   }
 
   @TestTemplate
@@ -967,11 +960,8 @@ public abstract class TestRemoveOrphanFilesAction extends 
TestBase {
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
     Iterable<String> orphanFileLocations = result.orphanFileLocations();
-    assertThat(orphanFileLocations).as("Should be orphan file").hasSize(1);
-    assertThat(Iterables.getOnlyElement(orphanFileLocations))
-        .as("Deleted file")
-        .isEqualTo(statsLocation.toURI().toString());
-    assertThat(statsLocation.exists()).as("stats file should be 
deleted").isFalse();
+    
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+    assertThat(statsLocation).as("stats file should be 
deleted").doesNotExist();
   }
 
   @TestTemplate
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 35d86b0a44..5f98287951 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -57,9 +57,9 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false))
+    assertThat(results.orphanFileLocations())
         .as("trash file should be removed")
-        .anyMatch(file -> file.contains("file:" + location + trashFile));
+        .contains("file:" + location + trashFile);
   }
 
   @TestTemplate
@@ -86,9 +86,9 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false))
+    assertThat(results.orphanFileLocations())
         .as("trash file should be removed")
-        .anyMatch(file -> file.contains("file:" + location + trashFile));
+        .contains("file:" + location + trashFile);
   }
 
   @TestTemplate
@@ -148,9 +148,9 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false))
+    assertThat(results.orphanFileLocations())
         .as("trash file should be removed")
-        .anyMatch(file -> file.contains("file:" + location + trashFile));
+        .contains("file:" + location + trashFile);
   }
 
   @TestTemplate
@@ -169,7 +169,7 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
     cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, 
properties);
     SparkTable table = (SparkTable) cat.loadTable(id);
 
-    spark.sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
+    sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
 
     String location = table.table().location().replaceFirst("file:", "");
     String trashFile = randomName("/data/trashfile");
@@ -180,13 +180,13 @@ public class TestRemoveOrphanFilesAction3 extends 
TestRemoveOrphanFilesAction {
             .deleteOrphanFiles(table.table())
             .olderThan(System.currentTimeMillis() + 1000)
             .execute();
-    
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(), 
false))
+    assertThat(results.orphanFileLocations())
         .as("trash file should be removed")
-        .anyMatch(file -> file.contains("file:" + location + trashFile));
+        .contains("file:" + location + trashFile);
   }
 
   @AfterEach
-  public void resetSparkSessionCatalog() throws Exception {
+  public void resetSparkSessionCatalog() {
     spark.conf().unset("spark.sql.catalog.spark_catalog");
     spark.conf().unset("spark.sql.catalog.spark_catalog.type");
     spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bdbb8c1768..b3af2b2b21 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -658,7 +658,7 @@ public class TestRewriteDataFilesAction extends TestBase {
     shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 
1", 3);
 
     shouldHaveSnapshots(table, 5);
-    
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+    
assertThat(table.currentSnapshot().summary()).containsEntry("total-position-deletes",
 "0");
     assertEquals("Rows must match", expectedRecords, currentData());
   }
 
@@ -1894,7 +1894,7 @@ public class TestRewriteDataFilesAction extends TestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
@@ -1917,7 +1917,7 @@ public class TestRewriteDataFilesAction extends TestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
@@ -1956,7 +1956,7 @@ public class TestRewriteDataFilesAction extends TestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
@@ -1979,7 +1979,7 @@ public class TestRewriteDataFilesAction extends TestBase {
             .execute();
 
     assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
-    assertThat(currentData().size()).isEqualTo(count);
+    assertThat(currentData()).hasSize((int) count);
     shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
   }
 
@@ -2049,10 +2049,9 @@ public class TestRewriteDataFilesAction extends TestBase 
{
 
   protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
     table.refresh();
-    int actualSnapshots = Iterables.size(table.snapshots());
-    assertThat(actualSnapshots)
+    assertThat(table.snapshots())
         .as("Table did not have the expected number of snapshots")
-        .isEqualTo(expectedSnapshots);
+        .hasSize(expectedSnapshots);
   }
 
   protected void shouldHaveNoOrphans(Table table) {
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4497184354..76b201aa56 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -400,7 +400,6 @@ public class TestRewriteManifestsAction extends TestBase {
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-
     assertThat(newManifests).as("Should have 2 manifests after 
rewrite").hasSize(2);
 
     assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);

Reply via email to