This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 809a2327f1 Spark 3.4: Migrate SparkCatalogTestBase related tests to 
JUnit5 (#13007)
809a2327f1 is described below

commit 809a2327f15dd0f5f9a20f43b3dbb1632bb00828
Author: Tom Tanaka <[email protected]>
AuthorDate: Thu May 8 18:16:45 2025 +0900

    Spark 3.4: Migrate SparkCatalogTestBase related tests to JUnit5 (#13007)
---
 .../apache/iceberg/spark/SparkCatalogTestBase.java |  72 ------------
 .../spark/source/TestCompressionSettings.java      | 114 ++++++++++++------
 .../TestRequiredDistributionAndOrdering.java       |  35 +++---
 .../iceberg/spark/source/TestRuntimeFiltering.java |  78 +++++++------
 .../source/TestSparkCatalogHadoopOverrides.java    |  66 +++++------
 .../iceberg/spark/source/TestSparkStagedScan.java  |  39 ++++---
 .../iceberg/spark/source/TestSparkTable.java       |  24 ++--
 .../spark/source/TestStructuredStreamingRead3.java | 129 ++++++++++-----------
 .../TestRequiredDistributionAndOrdering.java       |   3 +
 .../source/TestSparkCatalogHadoopOverrides.java    |   3 +
 .../iceberg/spark/source/TestSparkStagedScan.java  |   3 +
 .../iceberg/spark/source/TestSparkTable.java       |   3 +
 .../TestRequiredDistributionAndOrdering.java       |   3 +
 .../source/TestSparkCatalogHadoopOverrides.java    |   3 +
 .../iceberg/spark/source/TestSparkStagedScan.java  |   3 +
 .../iceberg/spark/source/TestSparkTable.java       |   3 +
 16 files changed, 289 insertions(+), 292 deletions(-)

diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
deleted file mode 100644
index 6b2b9a1b80..0000000000
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import java.util.Map;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class SparkCatalogTestBase extends SparkTestBaseWithCatalog {
-
-  // these parameters are broken out to avoid changes that need to modify lots 
of test suites
-  @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, 
config = {2}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-      {
-        SparkCatalogConfig.HIVE.catalogName(),
-        SparkCatalogConfig.HIVE.implementation(),
-        SparkCatalogConfig.HIVE.properties()
-      },
-      {
-        SparkCatalogConfig.HADOOP.catalogName(),
-        SparkCatalogConfig.HADOOP.implementation(),
-        SparkCatalogConfig.HADOOP.properties()
-      },
-      {
-        SparkCatalogConfig.SPARK.catalogName(),
-        SparkCatalogConfig.SPARK.implementation(),
-        SparkCatalogConfig.SPARK.properties()
-      },
-      {
-        SparkCatalogConfig.REST.catalogName(),
-        SparkCatalogConfig.REST.implementation(),
-        ImmutableMap.builder()
-            .putAll(SparkCatalogConfig.REST.properties())
-            .put(CatalogProperties.URI, REST_SERVER_RULE.uri())
-            .build()
-      }
-    };
-  }
-
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
-
-  public SparkCatalogTestBase(SparkCatalogConfig config) {
-    super(config);
-  }
-
-  public SparkCatalogTestBase(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
-}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 14e8fc34c3..24a14bb64d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.FileFormat.AVRO;
+import static org.apache.iceberg.FileFormat.ORC;
+import static org.apache.iceberg.FileFormat.PARQUET;
 import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
 import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
@@ -50,6 +53,9 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
@@ -58,8 +64,8 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.SparkCatalogConfig;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.orc.OrcFile;
@@ -69,73 +75,98 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestCompressionSettings extends SparkCatalogTestBase {
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestCompressionSettings extends CatalogTestBase {
 
   private static final Configuration CONF = new Configuration();
   private static final String TABLE_NAME = "testWriteData";
 
   private static SparkSession spark = null;
 
-  private final FileFormat format;
-  private final ImmutableMap<String, String> properties;
+  @Parameter(index = 3)
+  private FileFormat format;
+
+  @Parameter(index = 4)
+  private Map<String, String> properties;
 
-  @Rule public TemporaryFolder temp = new TemporaryFolder();
+  @TempDir private java.nio.file.Path temp;
 
-  @Parameterized.Parameters(name = "format = {0}, properties = {1}")
+  @Parameters(
+      name =
+          "catalogName = {0}, implementation = {1}, config = {2}, format = 
{3}, properties = {4}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", 
COMPRESSION_LEVEL, "1")},
-      {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")},
-      {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, 
"speed")},
-      {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, 
"compression")},
-      {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, 
"3")}
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        PARQUET,
+        ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        PARQUET,
+        ImmutableMap.of(COMPRESSION_CODEC, "gzip")
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        ORC,
+        ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, 
"speed")
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        ORC,
+        ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, 
"compression")
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        AVRO,
+        ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")
+      }
     };
   }
 
-  @BeforeClass
+  @BeforeAll
   public static void startSpark() {
     TestCompressionSettings.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
   }
 
-  @Before
+  @BeforeEach
   public void resetSpecificConfigurations() {
     spark.conf().unset(COMPRESSION_CODEC);
     spark.conf().unset(COMPRESSION_LEVEL);
     spark.conf().unset(COMPRESSION_STRATEGY);
   }
 
-  @Parameterized.AfterParam
-  public static void clearSourceCache() {
+  @AfterEach
+  public void afterEach() {
     spark.sql(String.format("DROP TABLE IF EXISTS %s", TABLE_NAME));
   }
 
-  @AfterClass
+  @AfterAll
   public static void stopSpark() {
     SparkSession currentSpark = TestCompressionSettings.spark;
     TestCompressionSettings.spark = null;
     currentSpark.stop();
   }
 
-  public TestCompressionSettings(String format, ImmutableMap properties) {
-    super(
-        SparkCatalogConfig.SPARK.catalogName(),
-        SparkCatalogConfig.SPARK.implementation(),
-        SparkCatalogConfig.SPARK.properties());
-    this.format = FileFormat.fromString(format);
-    this.properties = properties;
-  }
-
-  @Test
+  @TestTemplate
   public void testWriteDataWithDifferentSetting() throws Exception {
     sql("CREATE TABLE %s (id int, data string) USING iceberg", TABLE_NAME);
     Map<String, String> tableProperties = Maps.newHashMap();
@@ -168,6 +199,8 @@ public class TestCompressionSettings extends 
SparkCatalogTestBase {
       spark.conf().set(entry.getKey(), entry.getValue());
     }
 
+    assertSparkConf();
+
     df.select("id", "data")
         .writeTo(TABLE_NAME)
         .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
@@ -230,4 +263,13 @@ public class TestCompressionSettings extends 
SparkCatalogTestBase {
         return fileReader.getMetaString(DataFileConstants.CODEC);
     }
   }
+
+  private void assertSparkConf() {
+    String[] propertiesToCheck = {COMPRESSION_CODEC, COMPRESSION_LEVEL, 
COMPRESSION_STRATEGY};
+    for (String prop : propertiesToCheck) {
+      String expected = properties.getOrDefault(prop, null);
+      String actual = spark.conf().get(prop, null);
+      assertThat(actual).isEqualToIgnoringCase(expected);
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index b669c91313..dc7d87b903 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,31 +21,28 @@ package org.apache.iceberg.spark.source;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
-import java.util.Map;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
 
-  public TestRequiredDistributionAndOrdering(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
-
-  @After
+  @AfterEach
   public void dropTestTable() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @Test
+  @TestTemplate
   public void testDefaultLocalSort() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -74,7 +71,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
         sql("SELECT count(*) FROM %s", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testPartitionColumnsArePrependedForRangeDistribution() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -110,7 +107,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
         sql("SELECT count(*) FROM %s", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testSortOrderIncludesPartitionColumns() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -142,7 +139,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
         sql("SELECT count(*) FROM %s", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testDisabledDistributionAndOrdering() {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -176,7 +173,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
                 + "and by partition within each spec. Either cluster the 
incoming records or switch to fanout writers.");
   }
 
-  @Test
+  @TestTemplate
   public void testHashDistribution() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -212,7 +209,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
         sql("SELECT count(*) FROM %s", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testSortBucketTransformsWithoutExtensions() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -238,7 +235,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
     assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY 
c1", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testRangeDistributionWithQuotedColumnsNames() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, `c.3` STRING) "
@@ -274,7 +271,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
         sql("SELECT count(*) FROM %s", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testHashDistributionWithQuotedColumnsNames() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, `c``3` STRING) "
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
index b09c995b30..e7346e270f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
@@ -22,6 +22,7 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -29,6 +30,9 @@ import java.util.List;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
@@ -36,38 +40,47 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestRuntimeFiltering extends SparkTestBaseWithCatalog {
-
-  @Parameterized.Parameters(name = "planningMode = {0}")
-  public static Object[] parameters() {
-    return new Object[] {LOCAL, DISTRIBUTED};
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRuntimeFiltering extends TestBaseWithCatalog {
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, 
planningMode = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        LOCAL
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        DISTRIBUTED
+      }
+    };
   }
 
-  private final PlanningMode planningMode;
-
-  public TestRuntimeFiltering(PlanningMode planningMode) {
-    this.planningMode = planningMode;
-  }
+  @Parameter(index = 3)
+  private PlanningMode planningMode;
 
-  @After
+  @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
     sql("DROP TABLE IF EXISTS dim");
   }
 
-  @Test
+  @TestTemplate
   public void testIdentityPartitionedTable() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -106,7 +119,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testBucketedTable() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -145,7 +158,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testRenamedSourceColumnTable() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -186,7 +199,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testMultipleRuntimeFilters() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -229,7 +242,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testCaseSensitivityOfRuntimeFilters() throws 
NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -273,7 +286,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(caseInsensitiveQuery));
   }
 
-  @Test
+  @TestTemplate
   public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException 
{
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) 
USING iceberg",
@@ -325,7 +338,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testSourceColumnWithDots() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (`i.d` BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -369,7 +382,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testSourceColumnWithBackticks() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (`i``d` BIGINT, data STRING, date DATE, ts TIMESTAMP) 
"
@@ -410,7 +423,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
         sql(query));
   }
 
-  @Test
+  @TestTemplate
   public void testUnpartitionedTable() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) 
USING iceberg",
@@ -458,7 +471,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
     List<Row> output = spark.sql("EXPLAIN EXTENDED " + query).collectAsList();
     String plan = output.get(0).getString(0);
     int actualFilterCount = StringUtils.countMatches(plan, 
"dynamicpruningexpression");
-    Assert.assertEquals(errorMessage, expectedFilterCount, actualFilterCount);
+    
assertThat(actualFilterCount).as(errorMessage).isEqualTo(expectedFilterCount);
   }
 
   // delete files that don't match the filter to ensure dynamic filtering 
works and only required
@@ -490,9 +503,8 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
       throw new UncheckedIOException(e);
     }
 
-    Assert.assertEquals(
-        "Deleted unexpected number of files",
-        expectedDeletedFileCount,
-        deletedFileLocations.size());
+    assertThat(deletedFileLocations)
+        .as("Deleted unexpected number of files")
+        .hasSize(expectedDeletedFileCount);
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c276713113..fd155a6bca 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -18,25 +18,28 @@
  */
 package org.apache.iceberg.spark.source;
 
-import java.util.Map;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.SparkCatalog;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
 
   private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
   // prepend "hadoop." so that the test base formats SQLConf correctly
@@ -44,7 +47,7 @@ public class TestSparkCatalogHadoopOverrides extends 
SparkCatalogTestBase {
   private static final String HADOOP_PREFIXED_CONFIG_TO_OVERRIDE = "hadoop." + 
CONFIG_TO_OVERRIDE;
   private static final String CONFIG_OVERRIDE_VALUE = "/tmp-overridden";
 
-  @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, 
config = {2}")
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
       {
@@ -77,41 +80,36 @@ public class TestSparkCatalogHadoopOverrides extends 
SparkCatalogTestBase {
     };
   }
 
-  public TestSparkCatalogHadoopOverrides(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
-
-  @Before
+  @BeforeEach
   public void createTable() {
     sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg", 
tableName(tableIdent.name()));
   }
 
-  @After
+  @AfterEach
   public void dropTable() {
     sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name()));
   }
 
-  @Test
+  @TestTemplate
   public void testTableFromCatalogHasOverrides() throws Exception {
     Table table = getIcebergTableFromSparkCatalog();
     Configuration conf = ((Configurable) table.io()).getConf();
     String actualCatalogOverride = conf.get(CONFIG_TO_OVERRIDE, "/whammies");
-    Assert.assertEquals(
-        "Iceberg tables from spark should have the overridden hadoop 
configurations from the spark config",
-        CONFIG_OVERRIDE_VALUE,
-        actualCatalogOverride);
+    assertThat(actualCatalogOverride)
+        .as(
+            "Iceberg tables from spark should have the overridden hadoop 
configurations from the spark config")
+        .isEqualTo(CONFIG_OVERRIDE_VALUE);
   }
 
-  @Test
+  @TestTemplate
   public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws 
Exception {
     Table table = getIcebergTableFromSparkCatalog();
     Configuration originalConf = ((Configurable) table.io()).getConf();
     String actualCatalogOverride = originalConf.get(CONFIG_TO_OVERRIDE, 
"/whammies");
-    Assert.assertEquals(
-        "Iceberg tables from spark should have the overridden hadoop 
configurations from the spark config",
-        CONFIG_OVERRIDE_VALUE,
-        actualCatalogOverride);
+    assertThat(actualCatalogOverride)
+        .as(
+            "Iceberg tables from spark should have the overridden hadoop 
configurations from the spark config")
+        .isEqualTo(CONFIG_OVERRIDE_VALUE);
 
     // Now convert to SerializableTable and ensure overridden property is 
still present.
     Table serializableTable = SerializableTableWithSize.copyOf(table);
@@ -119,19 +117,19 @@ public class TestSparkCatalogHadoopOverrides extends 
SparkCatalogTestBase {
         
KryoHelpers.roundTripSerialize(SerializableTableWithSize.copyOf(table));
     Configuration configFromKryoSerde = ((Configurable) 
kryoSerializedTable.io()).getConf();
     String kryoSerializedCatalogOverride = 
configFromKryoSerde.get(CONFIG_TO_OVERRIDE, "/whammies");
-    Assert.assertEquals(
-        "Tables serialized with Kryo serialization should retain overridden 
hadoop configuration properties",
-        CONFIG_OVERRIDE_VALUE,
-        kryoSerializedCatalogOverride);
+    assertThat(kryoSerializedCatalogOverride)
+        .as(
+            "Tables serialized with Kryo serialization should retain 
overridden hadoop configuration properties")
+        .isEqualTo(CONFIG_OVERRIDE_VALUE);
 
     // Do the same for Java based serde
     Table javaSerializedTable = 
TestHelpers.roundTripSerialize(serializableTable);
     Configuration configFromJavaSerde = ((Configurable) 
javaSerializedTable.io()).getConf();
     String javaSerializedCatalogOverride = 
configFromJavaSerde.get(CONFIG_TO_OVERRIDE, "/whammies");
-    Assert.assertEquals(
-        "Tables serialized with Java serialization should retain overridden 
hadoop configuration properties",
-        CONFIG_OVERRIDE_VALUE,
-        javaSerializedCatalogOverride);
+    assertThat(javaSerializedCatalogOverride)
+        .as(
+            "Tables serialized with Java serialization should retain 
overridden hadoop configuration properties")
+        .isEqualTo(CONFIG_OVERRIDE_VALUE);
   }
 
   @SuppressWarnings("ThrowSpecificity")
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 241293f367..b0029c09ab 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -18,38 +18,35 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.ScanTaskSetManager;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSparkStagedScan extends SparkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-  public TestSparkStagedScan(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkStagedScan extends CatalogTestBase {
 
-  @After
+  @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @Test
+  @TestTemplate
   public void testTaskSetLoading() throws NoSuchTableException, IOException {
     sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
 
@@ -59,7 +56,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase 
{
     df.writeTo(tableName).append();
 
     Table table = validationCatalog.loadTable(tableIdent);
-    Assert.assertEquals("Should produce 1 snapshot", 1, 
Iterables.size(table.snapshots()));
+    assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(1);
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
@@ -84,7 +81,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase 
{
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
 
-  @Test
+  @TestTemplate
   public void testTaskSetPlanning() throws NoSuchTableException, IOException {
     sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
 
@@ -95,7 +92,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase 
{
     df.coalesce(1).writeTo(tableName).append();
 
     Table table = validationCatalog.loadTable(tableIdent);
-    Assert.assertEquals("Should produce 2 snapshots", 2, 
Iterables.size(table.snapshots()));
+    assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(2);
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
@@ -111,7 +108,9 @@ public class TestSparkStagedScan extends 
SparkCatalogTestBase {
               .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
               .option(SparkReadOptions.SPLIT_SIZE, 
tasks.get(0).file().fileSizeInBytes())
               .load(tableName);
-      Assert.assertEquals("Num partitions should match", 2, 
scanDF.javaRDD().getNumPartitions());
+      assertThat(scanDF.javaRDD().getNumPartitions())
+          .as("Num partitions should match")
+          .isEqualTo(2);
 
       // load the staged file set and make sure we combine both files into a 
single split
       scanDF =
@@ -121,7 +120,9 @@ public class TestSparkStagedScan extends 
SparkCatalogTestBase {
               .option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
               .load(tableName);
-      Assert.assertEquals("Num partitions should match", 1, 
scanDF.javaRDD().getNumPartitions());
+      assertThat(scanDF.javaRDD().getNumPartitions())
+          .as("Num partitions should match")
+          .isEqualTo(1);
     }
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 616a196872..4a386ee861 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -18,34 +18,32 @@
  */
 package org.apache.iceberg.spark.source;
 
-import java.util.Map;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-public class TestSparkTable extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkTable extends CatalogTestBase {
 
-  public TestSparkTable(String catalogName, String implementation, Map<String, 
String> config) {
-    super(catalogName, implementation, config);
-  }
-
-  @Before
+  @BeforeEach
   public void createTable() {
     sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
   }
 
-  @After
+  @AfterEach
   public void removeTable() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @Test
+  @TestTemplate
   public void testTableEquality() throws NoSuchTableException {
     CatalogManager catalogManager = spark.sessionState().catalogManager();
     TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index b7d415de34..2544a8f739 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
@@ -49,7 +50,7 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
@@ -59,20 +60,14 @@ import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.streaming.StreamingQuery;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-@RunWith(Parameterized.class)
-public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
-  public TestStructuredStreamingRead3(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
+@ExtendWith(ParameterizedTestExtension.class)
+public final class TestStructuredStreamingRead3 extends CatalogTestBase {
 
   private Table table;
 
@@ -114,13 +109,13 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
               Lists.newArrayList(
                   new SimpleRecord(15, "fifteen"), new SimpleRecord(16, 
"sixteen"))));
 
-  @BeforeClass
+  @BeforeAll
   public static void setupSpark() {
     // disable AQE as tests assume that writes generate a particular number of 
files
     spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
   }
 
-  @Before
+  @BeforeEach
   public void setupTable() {
     sql(
         "CREATE TABLE %s "
@@ -132,19 +127,19 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     microBatches.set(0);
   }
 
-  @After
+  @AfterEach
   public void stopStreams() throws TimeoutException {
     for (StreamingQuery query : spark.streams().active()) {
       query.stop();
     }
   }
 
-  @After
+  @AfterEach
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws 
Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
     appendDataAsMultipleSnapshots(expected);
@@ -155,37 +150,38 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
       throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    Assert.assertEquals(
-        6,
-        microBatchCount(
-            
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+        .isEqualTo(6);
   }
 
-  @Test
+  @TestTemplate
   public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
       throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    Assert.assertEquals(
-        3,
-        microBatchCount(
-            
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")));
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
+        .isEqualTo(3);
   }
 
-  @Test
+  @TestTemplate
   public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
       throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
     // only 1 micro-batch will be formed and we will read data partially
-    Assert.assertEquals(
-        1,
-        
microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "1")));
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
+        .isEqualTo(1);
 
     StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
 
@@ -196,17 +192,18 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
             Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
   }
 
-  @Test
+  @TestTemplate
   public void 
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
       throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    Assert.assertEquals(
-        2,
-        
microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "4")));
+    assertThat(
+            microBatchCount(
+                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
+        .isEqualTo(2);
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamOnIcebergThenAddData() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
 
@@ -218,7 +215,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void testReadingStreamFromTimestamp() throws Exception {
     List<SimpleRecord> dataBeforeTimestamp =
         Lists.newArrayList(
@@ -245,7 +242,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void testReadingStreamFromFutureTimetsamp() throws Exception {
     long futureTimestamp = System.currentTimeMillis() + 10000;
 
@@ -277,7 +274,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
   }
 
-  @Test
+  @TestTemplate
   public void testReadingStreamFromTimestampFutureWithExistingSnapshots() 
throws Exception {
     List<SimpleRecord> dataBeforeTimestamp =
         Lists.newArrayList(
@@ -290,7 +287,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     StreamingQuery query =
         startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, 
Long.toString(streamStartTimestamp));
     List<SimpleRecord> actual = rowsAvailable(query);
-    Assert.assertEquals(Collections.emptyList(), actual);
+    assertThat(actual).isEmpty();
 
     // Stream should contain data added after the timestamp elapses
     waitUntilAfter(streamStartTimestamp);
@@ -300,7 +297,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
         .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void testReadingStreamFromTimestampOfExistingSnapshot() throws 
Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
 
@@ -322,7 +319,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws 
TimeoutException {
     List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new 
SimpleRecord(1, "one"));
 
@@ -351,11 +348,11 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
   }
 
-  @Test
+  @TestTemplate
   public void testResumingStreamReadFromCheckpoint() throws Exception {
-    File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
     File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
-    File output = temp.newFolder();
+    File output = temp.resolve("junit").toFile();
 
     DataStreamWriter querySource =
         spark
@@ -391,11 +388,11 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     }
   }
 
-  @Test
+  @TestTemplate
   public void testFailReadingCheckpointInvalidSnapshot() throws IOException, 
TimeoutException {
-    File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+    File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
     File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
-    File output = temp.newFolder();
+    File output = temp.resolve("junit").toFile();
 
     DataStreamWriter querySource =
         spark
@@ -431,7 +428,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
                 firstSnapshotid));
   }
 
-  @Test
+  @TestTemplate
   public void testParquetOrcAvroDataInOneTable() throws Exception {
     List<SimpleRecord> parquetFileRecords =
         Lists.newArrayList(
@@ -453,14 +450,14 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
             Iterables.concat(parquetFileRecords, orcFileRecords, 
avroFileRecords));
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamFromEmptyTable() throws Exception {
     StreamingQuery stream = startStream();
     List<SimpleRecord> actual = rowsAvailable(stream);
-    Assert.assertEquals(Collections.emptyList(), actual);
+    assertThat(actual).isEmpty();
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws 
Exception {
     // upgrade table to version 2 - to facilitate creation of Snapshot of type 
OVERWRITE.
     TableOperations ops = ((BaseTable) table).operations();
@@ -481,14 +478,14 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     DeleteFile eqDeletes =
         FileHelpers.writeDeleteFile(
             table,
-            Files.localOutput(temp.newFile()),
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
             TestHelpers.Row.of(0),
             dataDeletes,
             deleteRowSchema);
 
     DataFile dataFile =
         DataFiles.builder(table.spec())
-            .withPath(temp.newFile().toString())
+            .withPath(File.createTempFile("junit", null, 
temp.toFile()).getPath())
             .withFileSizeInBytes(10)
             .withRecordCount(1)
             .withFormat(FileFormat.PARQUET)
@@ -498,7 +495,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     // check pre-condition - that the above Delete file write - actually 
resulted in snapshot of
     // type OVERWRITE
-    Assert.assertEquals(DataOperations.OVERWRITE, 
table.currentSnapshot().operation());
+    
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE);
 
     StreamingQuery query = startStream();
 
@@ -508,7 +505,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
         .hasMessageStartingWith("Cannot process overwrite snapshot");
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws 
Exception {
     // fill table with some data
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -518,14 +515,14 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
     table.rewriteManifests().clusterBy(f -> 1).commit();
 
     // check pre-condition
-    Assert.assertEquals(DataOperations.REPLACE, 
table.currentSnapshot().operation());
+    
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.REPLACE);
 
     StreamingQuery query = startStream();
     List<SimpleRecord> actual = rowsAvailable(query);
     
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception 
{
     table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
 
@@ -538,7 +535,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // DELETE.
-    Assert.assertEquals(DataOperations.DELETE, 
table.currentSnapshot().operation());
+    
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE);
 
     StreamingQuery query = startStream();
 
@@ -548,7 +545,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
         .hasMessageStartingWith("Cannot process delete snapshot");
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws 
Exception {
     table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
 
@@ -561,14 +558,14 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // DELETE.
-    Assert.assertEquals(DataOperations.DELETE, 
table.currentSnapshot().operation());
+    
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE);
 
     StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
     assertThat(rowsAvailable(query))
         
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
   }
 
-  @Test
+  @TestTemplate
   public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() 
throws Exception {
     table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
 
@@ -578,7 +575,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     DataFile dataFile =
         DataFiles.builder(table.spec())
-            .withPath(temp.newFile().toString())
+            .withPath(File.createTempFile("junit", null, 
temp.toFile()).getPath())
             .withFileSizeInBytes(10)
             .withRecordCount(1)
             .withFormat(FileFormat.PARQUET)
@@ -593,7 +590,7 @@ public final class TestStructuredStreamingRead3 extends 
SparkCatalogTestBase {
 
     // check pre-condition - that the above delete operation on table resulted 
in Snapshot of Type
     // OVERWRITE.
-    Assert.assertEquals(DataOperations.OVERWRITE, 
table.currentSnapshot().operation());
+    
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE);
 
     StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true");
     assertThat(rowsAvailable(query))
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 55fd2cefe2..5dbfc7fa6c 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -31,7 +32,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
 
   @AfterEach
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c031f2991f..fd155a6bca 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
 
   private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 6ce2ce6238..e444b7cb1f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkStagedScan extends CatalogTestBase {
 
   @AfterEach
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 46ee484b39..d14b1a52cf 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -28,7 +29,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkTable extends CatalogTestBase {
 
   @BeforeEach
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 55fd2cefe2..5dbfc7fa6c 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -31,7 +32,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
 
   @AfterEach
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c031f2991f..fd155a6bca 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TestHelpers;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
 
   private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 6ce2ce6238..e444b7cb1f 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkStagedScan extends CatalogTestBase {
 
   @AfterEach
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 46ee484b39..d14b1a52cf 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.spark.CatalogTestBase;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -28,7 +29,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestSparkTable extends CatalogTestBase {
 
   @BeforeEach

Reply via email to