This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 809a2327f1 Spark 3.4: Migrate SparkCatalogTestBase related tests to
JUnit5 (#13007)
809a2327f1 is described below
commit 809a2327f15dd0f5f9a20f43b3dbb1632bb00828
Author: Tom Tanaka <[email protected]>
AuthorDate: Thu May 8 18:16:45 2025 +0900
Spark 3.4: Migrate SparkCatalogTestBase related tests to JUnit5 (#13007)
---
.../apache/iceberg/spark/SparkCatalogTestBase.java | 72 ------------
.../spark/source/TestCompressionSettings.java | 114 ++++++++++++------
.../TestRequiredDistributionAndOrdering.java | 35 +++---
.../iceberg/spark/source/TestRuntimeFiltering.java | 78 +++++++------
.../source/TestSparkCatalogHadoopOverrides.java | 66 +++++------
.../iceberg/spark/source/TestSparkStagedScan.java | 39 ++++---
.../iceberg/spark/source/TestSparkTable.java | 24 ++--
.../spark/source/TestStructuredStreamingRead3.java | 129 ++++++++++-----------
.../TestRequiredDistributionAndOrdering.java | 3 +
.../source/TestSparkCatalogHadoopOverrides.java | 3 +
.../iceberg/spark/source/TestSparkStagedScan.java | 3 +
.../iceberg/spark/source/TestSparkTable.java | 3 +
.../TestRequiredDistributionAndOrdering.java | 3 +
.../source/TestSparkCatalogHadoopOverrides.java | 3 +
.../iceberg/spark/source/TestSparkStagedScan.java | 3 +
.../iceberg/spark/source/TestSparkTable.java | 3 +
16 files changed, 289 insertions(+), 292 deletions(-)
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
deleted file mode 100644
index 6b2b9a1b80..0000000000
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark;
-
-import java.util.Map;
-import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public abstract class SparkCatalogTestBase extends SparkTestBaseWithCatalog {
-
- // these parameters are broken out to avoid changes that need to modify lots
of test suites
- @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1},
config = {2}")
- public static Object[][] parameters() {
- return new Object[][] {
- {
- SparkCatalogConfig.HIVE.catalogName(),
- SparkCatalogConfig.HIVE.implementation(),
- SparkCatalogConfig.HIVE.properties()
- },
- {
- SparkCatalogConfig.HADOOP.catalogName(),
- SparkCatalogConfig.HADOOP.implementation(),
- SparkCatalogConfig.HADOOP.properties()
- },
- {
- SparkCatalogConfig.SPARK.catalogName(),
- SparkCatalogConfig.SPARK.implementation(),
- SparkCatalogConfig.SPARK.properties()
- },
- {
- SparkCatalogConfig.REST.catalogName(),
- SparkCatalogConfig.REST.implementation(),
- ImmutableMap.builder()
- .putAll(SparkCatalogConfig.REST.properties())
- .put(CatalogProperties.URI, REST_SERVER_RULE.uri())
- .build()
- }
- };
- }
-
- @Rule public TemporaryFolder temp = new TemporaryFolder();
-
- public SparkCatalogTestBase(SparkCatalogConfig config) {
- super(config);
- }
-
- public SparkCatalogTestBase(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
- }
-}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
index 14e8fc34c3..24a14bb64d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
@@ -18,6 +18,9 @@
*/
package org.apache.iceberg.spark.source;
+import static org.apache.iceberg.FileFormat.AVRO;
+import static org.apache.iceberg.FileFormat.ORC;
+import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
@@ -50,6 +53,9 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
@@ -58,8 +64,8 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.orc.OrcFile;
@@ -69,73 +75,98 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestCompressionSettings extends SparkCatalogTestBase {
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestCompressionSettings extends CatalogTestBase {
private static final Configuration CONF = new Configuration();
private static final String TABLE_NAME = "testWriteData";
private static SparkSession spark = null;
- private final FileFormat format;
- private final ImmutableMap<String, String> properties;
+ @Parameter(index = 3)
+ private FileFormat format;
+
+ @Parameter(index = 4)
+ private Map<String, String> properties;
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ @TempDir private java.nio.file.Path temp;
- @Parameterized.Parameters(name = "format = {0}, properties = {1}")
+ @Parameters(
+ name =
+ "catalogName = {0}, implementation = {1}, config = {2}, format =
{3}, properties = {4}")
public static Object[][] parameters() {
return new Object[][] {
- {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd",
COMPRESSION_LEVEL, "1")},
- {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")},
- {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY,
"speed")},
- {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY,
"compression")},
- {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL,
"3")}
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ PARQUET,
+ ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ PARQUET,
+ ImmutableMap.of(COMPRESSION_CODEC, "gzip")
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ ORC,
+ ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY,
"speed")
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ ORC,
+ ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY,
"compression")
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ AVRO,
+ ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")
+ }
};
}
- @BeforeClass
+ @BeforeAll
public static void startSpark() {
TestCompressionSettings.spark =
SparkSession.builder().master("local[2]").getOrCreate();
}
- @Before
+ @BeforeEach
public void resetSpecificConfigurations() {
spark.conf().unset(COMPRESSION_CODEC);
spark.conf().unset(COMPRESSION_LEVEL);
spark.conf().unset(COMPRESSION_STRATEGY);
}
- @Parameterized.AfterParam
- public static void clearSourceCache() {
+ @AfterEach
+ public void afterEach() {
spark.sql(String.format("DROP TABLE IF EXISTS %s", TABLE_NAME));
}
- @AfterClass
+ @AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestCompressionSettings.spark;
TestCompressionSettings.spark = null;
currentSpark.stop();
}
- public TestCompressionSettings(String format, ImmutableMap properties) {
- super(
- SparkCatalogConfig.SPARK.catalogName(),
- SparkCatalogConfig.SPARK.implementation(),
- SparkCatalogConfig.SPARK.properties());
- this.format = FileFormat.fromString(format);
- this.properties = properties;
- }
-
- @Test
+ @TestTemplate
public void testWriteDataWithDifferentSetting() throws Exception {
sql("CREATE TABLE %s (id int, data string) USING iceberg", TABLE_NAME);
Map<String, String> tableProperties = Maps.newHashMap();
@@ -168,6 +199,8 @@ public class TestCompressionSettings extends
SparkCatalogTestBase {
spark.conf().set(entry.getKey(), entry.getValue());
}
+ assertSparkConf();
+
df.select("id", "data")
.writeTo(TABLE_NAME)
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
@@ -230,4 +263,13 @@ public class TestCompressionSettings extends
SparkCatalogTestBase {
return fileReader.getMetaString(DataFileConstants.CODEC);
}
}
+
+ private void assertSparkConf() {
+ String[] propertiesToCheck = {COMPRESSION_CODEC, COMPRESSION_LEVEL,
COMPRESSION_STRATEGY};
+ for (String prop : propertiesToCheck) {
+ String expected = properties.getOrDefault(prop, null);
+ String actual = spark.conf().get(prop, null);
+ assertThat(actual).isEqualToIgnoringCase(expected);
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index b669c91313..dc7d87b903 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,31 +21,28 @@ package org.apache.iceberg.spark.source;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
-import java.util.Map;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
-public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
- public TestRequiredDistributionAndOrdering(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
- }
-
- @After
+ @AfterEach
public void dropTestTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @Test
+ @TestTemplate
public void testDefaultLocalSort() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -74,7 +71,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
sql("SELECT count(*) FROM %s", tableName));
}
- @Test
+ @TestTemplate
public void testPartitionColumnsArePrependedForRangeDistribution() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -110,7 +107,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
sql("SELECT count(*) FROM %s", tableName));
}
- @Test
+ @TestTemplate
public void testSortOrderIncludesPartitionColumns() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -142,7 +139,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
sql("SELECT count(*) FROM %s", tableName));
}
- @Test
+ @TestTemplate
public void testDisabledDistributionAndOrdering() {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -176,7 +173,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
+ "and by partition within each spec. Either cluster the
incoming records or switch to fanout writers.");
}
- @Test
+ @TestTemplate
public void testHashDistribution() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -212,7 +209,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
sql("SELECT count(*) FROM %s", tableName));
}
- @Test
+ @TestTemplate
public void testSortBucketTransformsWithoutExtensions() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
@@ -238,7 +235,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY
c1", tableName));
}
- @Test
+ @TestTemplate
public void testRangeDistributionWithQuotedColumnsNames() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, `c.3` STRING) "
@@ -274,7 +271,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
sql("SELECT count(*) FROM %s", tableName));
}
- @Test
+ @TestTemplate
public void testHashDistributionWithQuotedColumnsNames() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, `c``3` STRING) "
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
index b09c995b30..e7346e270f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
@@ -22,6 +22,7 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.spark.sql.functions.date_add;
import static org.apache.spark.sql.functions.expr;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -29,6 +30,9 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
@@ -36,38 +40,47 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestRuntimeFiltering extends SparkTestBaseWithCatalog {
-
- @Parameterized.Parameters(name = "planningMode = {0}")
- public static Object[] parameters() {
- return new Object[] {LOCAL, DISTRIBUTED};
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRuntimeFiltering extends TestBaseWithCatalog {
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2},
planningMode = {3}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ LOCAL
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ DISTRIBUTED
+ }
+ };
}
- private final PlanningMode planningMode;
-
- public TestRuntimeFiltering(PlanningMode planningMode) {
- this.planningMode = planningMode;
- }
+ @Parameter(index = 3)
+ private PlanningMode planningMode;
- @After
+ @AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS dim");
}
- @Test
+ @TestTemplate
public void testIdentityPartitionedTable() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -106,7 +119,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testBucketedTable() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -145,7 +158,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testRenamedSourceColumnTable() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -186,7 +199,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testMultipleRuntimeFilters() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -229,7 +242,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testCaseSensitivityOfRuntimeFilters() throws
NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -273,7 +286,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(caseInsensitiveQuery));
}
- @Test
+ @TestTemplate
public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException
{
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP)
USING iceberg",
@@ -325,7 +338,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testSourceColumnWithDots() throws NoSuchTableException {
sql(
"CREATE TABLE %s (`i.d` BIGINT, data STRING, date DATE, ts TIMESTAMP) "
@@ -369,7 +382,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testSourceColumnWithBackticks() throws NoSuchTableException {
sql(
"CREATE TABLE %s (`i``d` BIGINT, data STRING, date DATE, ts TIMESTAMP)
"
@@ -410,7 +423,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
sql(query));
}
- @Test
+ @TestTemplate
public void testUnpartitionedTable() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP)
USING iceberg",
@@ -458,7 +471,7 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
List<Row> output = spark.sql("EXPLAIN EXTENDED " + query).collectAsList();
String plan = output.get(0).getString(0);
int actualFilterCount = StringUtils.countMatches(plan,
"dynamicpruningexpression");
- Assert.assertEquals(errorMessage, expectedFilterCount, actualFilterCount);
+
assertThat(actualFilterCount).as(errorMessage).isEqualTo(expectedFilterCount);
}
// delete files that don't match the filter to ensure dynamic filtering
works and only required
@@ -490,9 +503,8 @@ public class TestRuntimeFiltering extends
SparkTestBaseWithCatalog {
throw new UncheckedIOException(e);
}
- Assert.assertEquals(
- "Deleted unexpected number of files",
- expectedDeletedFileCount,
- deletedFileLocations.size());
+ assertThat(deletedFileLocations)
+ .as("Deleted unexpected number of files")
+ .hasSize(expectedDeletedFileCount);
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c276713113..fd155a6bca 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -18,25 +18,28 @@
*/
package org.apache.iceberg.spark.source;
-import java.util.Map;
+import static org.assertj.core.api.Assertions.assertThat;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalog;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
-public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
// prepend "hadoop." so that the test base formats SQLConf correctly
@@ -44,7 +47,7 @@ public class TestSparkCatalogHadoopOverrides extends
SparkCatalogTestBase {
private static final String HADOOP_PREFIXED_CONFIG_TO_OVERRIDE = "hadoop." +
CONFIG_TO_OVERRIDE;
private static final String CONFIG_OVERRIDE_VALUE = "/tmp-overridden";
- @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1},
config = {2}")
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{
@@ -77,41 +80,36 @@ public class TestSparkCatalogHadoopOverrides extends
SparkCatalogTestBase {
};
}
- public TestSparkCatalogHadoopOverrides(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
- }
-
- @Before
+ @BeforeEach
public void createTable() {
sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg",
tableName(tableIdent.name()));
}
- @After
+ @AfterEach
public void dropTable() {
sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name()));
}
- @Test
+ @TestTemplate
public void testTableFromCatalogHasOverrides() throws Exception {
Table table = getIcebergTableFromSparkCatalog();
Configuration conf = ((Configurable) table.io()).getConf();
String actualCatalogOverride = conf.get(CONFIG_TO_OVERRIDE, "/whammies");
- Assert.assertEquals(
- "Iceberg tables from spark should have the overridden hadoop
configurations from the spark config",
- CONFIG_OVERRIDE_VALUE,
- actualCatalogOverride);
+ assertThat(actualCatalogOverride)
+ .as(
+ "Iceberg tables from spark should have the overridden hadoop
configurations from the spark config")
+ .isEqualTo(CONFIG_OVERRIDE_VALUE);
}
- @Test
+ @TestTemplate
public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws
Exception {
Table table = getIcebergTableFromSparkCatalog();
Configuration originalConf = ((Configurable) table.io()).getConf();
String actualCatalogOverride = originalConf.get(CONFIG_TO_OVERRIDE,
"/whammies");
- Assert.assertEquals(
- "Iceberg tables from spark should have the overridden hadoop
configurations from the spark config",
- CONFIG_OVERRIDE_VALUE,
- actualCatalogOverride);
+ assertThat(actualCatalogOverride)
+ .as(
+ "Iceberg tables from spark should have the overridden hadoop
configurations from the spark config")
+ .isEqualTo(CONFIG_OVERRIDE_VALUE);
// Now convert to SerializableTable and ensure overridden property is
still present.
Table serializableTable = SerializableTableWithSize.copyOf(table);
@@ -119,19 +117,19 @@ public class TestSparkCatalogHadoopOverrides extends
SparkCatalogTestBase {
KryoHelpers.roundTripSerialize(SerializableTableWithSize.copyOf(table));
Configuration configFromKryoSerde = ((Configurable)
kryoSerializedTable.io()).getConf();
String kryoSerializedCatalogOverride =
configFromKryoSerde.get(CONFIG_TO_OVERRIDE, "/whammies");
- Assert.assertEquals(
- "Tables serialized with Kryo serialization should retain overridden
hadoop configuration properties",
- CONFIG_OVERRIDE_VALUE,
- kryoSerializedCatalogOverride);
+ assertThat(kryoSerializedCatalogOverride)
+ .as(
+ "Tables serialized with Kryo serialization should retain
overridden hadoop configuration properties")
+ .isEqualTo(CONFIG_OVERRIDE_VALUE);
// Do the same for Java based serde
Table javaSerializedTable =
TestHelpers.roundTripSerialize(serializableTable);
Configuration configFromJavaSerde = ((Configurable)
javaSerializedTable.io()).getConf();
String javaSerializedCatalogOverride =
configFromJavaSerde.get(CONFIG_TO_OVERRIDE, "/whammies");
- Assert.assertEquals(
- "Tables serialized with Java serialization should retain overridden
hadoop configuration properties",
- CONFIG_OVERRIDE_VALUE,
- javaSerializedCatalogOverride);
+ assertThat(javaSerializedCatalogOverride)
+ .as(
+ "Tables serialized with Java serialization should retain
overridden hadoop configuration properties")
+ .isEqualTo(CONFIG_OVERRIDE_VALUE);
}
@SuppressWarnings("ThrowSpecificity")
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 241293f367..b0029c09ab 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -18,38 +18,35 @@
*/
package org.apache.iceberg.spark.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.ScanTaskSetManager;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSparkStagedScan extends SparkCatalogTestBase {
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
- public TestSparkStagedScan(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
- }
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkStagedScan extends CatalogTestBase {
- @After
+ @AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @Test
+ @TestTemplate
public void testTaskSetLoading() throws NoSuchTableException, IOException {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
@@ -59,7 +56,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase
{
df.writeTo(tableName).append();
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Should produce 1 snapshot", 1,
Iterables.size(table.snapshots()));
+ assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(1);
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
@@ -84,7 +81,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase
{
sql("SELECT * FROM %s ORDER BY id", tableName));
}
- @Test
+ @TestTemplate
public void testTaskSetPlanning() throws NoSuchTableException, IOException {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
@@ -95,7 +92,7 @@ public class TestSparkStagedScan extends SparkCatalogTestBase
{
df.coalesce(1).writeTo(tableName).append();
Table table = validationCatalog.loadTable(tableIdent);
- Assert.assertEquals("Should produce 2 snapshots", 2,
Iterables.size(table.snapshots()));
+ assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(2);
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
@@ -111,7 +108,9 @@ public class TestSparkStagedScan extends
SparkCatalogTestBase {
.option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
.option(SparkReadOptions.SPLIT_SIZE,
tasks.get(0).file().fileSizeInBytes())
.load(tableName);
- Assert.assertEquals("Num partitions should match", 2,
scanDF.javaRDD().getNumPartitions());
+ assertThat(scanDF.javaRDD().getNumPartitions())
+ .as("Num partitions should match")
+ .isEqualTo(2);
// load the staged file set and make sure we combine both files into a
single split
scanDF =
@@ -121,7 +120,9 @@ public class TestSparkStagedScan extends
SparkCatalogTestBase {
.option(SparkReadOptions.SCAN_TASK_SET_ID, setID)
.option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
.load(tableName);
- Assert.assertEquals("Num partitions should match", 1,
scanDF.javaRDD().getNumPartitions());
+ assertThat(scanDF.javaRDD().getNumPartitions())
+ .as("Num partitions should match")
+ .isEqualTo(1);
}
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 616a196872..4a386ee861 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -18,34 +18,32 @@
*/
package org.apache.iceberg.spark.source;
-import java.util.Map;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
-public class TestSparkTable extends SparkCatalogTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkTable extends CatalogTestBase {
- public TestSparkTable(String catalogName, String implementation, Map<String,
String> config) {
- super(catalogName, implementation, config);
- }
-
- @Before
+ @BeforeEach
public void createTable() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
}
- @After
+ @AfterEach
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @Test
+ @TestTemplate
public void testTableEquality() throws NoSuchTableException {
CatalogManager catalogManager = spark.sessionState().catalogManager();
TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index b7d415de34..2544a8f739 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
@@ -49,7 +50,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
@@ -59,20 +60,14 @@ import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
-@RunWith(Parameterized.class)
-public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
- public TestStructuredStreamingRead3(
- String catalogName, String implementation, Map<String, String> config) {
- super(catalogName, implementation, config);
- }
+@ExtendWith(ParameterizedTestExtension.class)
+public final class TestStructuredStreamingRead3 extends CatalogTestBase {
private Table table;
@@ -114,13 +109,13 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
Lists.newArrayList(
new SimpleRecord(15, "fifteen"), new SimpleRecord(16,
"sixteen"))));
- @BeforeClass
+ @BeforeAll
public static void setupSpark() {
// disable AQE as tests assume that writes generate a particular number of
files
spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
}
- @Before
+ @BeforeEach
public void setupTable() {
sql(
"CREATE TABLE %s "
@@ -132,19 +127,19 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
microBatches.set(0);
}
- @After
+ @AfterEach
public void stopStreams() throws TimeoutException {
for (StreamingQuery query : spark.streams().active()) {
query.stop();
}
}
- @After
+ @AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @Test
+ @TestTemplate
public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);
@@ -155,37 +150,38 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- Assert.assertEquals(
- 6,
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
+ .isEqualTo(6);
}
- @Test
+ @TestTemplate
public void
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- Assert.assertEquals(
- 3,
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")));
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
+ .isEqualTo(3);
}
- @Test
+ @TestTemplate
public void
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
// only 1 micro-batch will be formed and we will read data partially
- Assert.assertEquals(
- 1,
-
microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1")));
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
+ .isEqualTo(1);
StreamingQuery query =
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
@@ -196,17 +192,18 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
}
- @Test
+ @TestTemplate
public void
testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- Assert.assertEquals(
- 2,
-
microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4")));
+ assertThat(
+ microBatchCount(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
+ .isEqualTo(2);
}
- @Test
+ @TestTemplate
public void testReadStreamOnIcebergThenAddData() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -218,7 +215,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void testReadingStreamFromTimestamp() throws Exception {
List<SimpleRecord> dataBeforeTimestamp =
Lists.newArrayList(
@@ -245,7 +242,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void testReadingStreamFromFutureTimetsamp() throws Exception {
long futureTimestamp = System.currentTimeMillis() + 10000;
@@ -277,7 +274,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
}
- @Test
+ @TestTemplate
public void testReadingStreamFromTimestampFutureWithExistingSnapshots()
throws Exception {
List<SimpleRecord> dataBeforeTimestamp =
Lists.newArrayList(
@@ -290,7 +287,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP,
Long.toString(streamStartTimestamp));
List<SimpleRecord> actual = rowsAvailable(query);
- Assert.assertEquals(Collections.emptyList(), actual);
+ assertThat(actual).isEmpty();
// Stream should contain data added after the timestamp elapses
waitUntilAfter(streamStartTimestamp);
@@ -300,7 +297,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
.containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void testReadingStreamFromTimestampOfExistingSnapshot() throws
Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -322,7 +319,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws
TimeoutException {
List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new
SimpleRecord(1, "one"));
@@ -351,11 +348,11 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
}
- @Test
+ @TestTemplate
public void testResumingStreamReadFromCheckpoint() throws Exception {
- File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+ File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
- File output = temp.newFolder();
+ File output = temp.resolve("junit").toFile();
DataStreamWriter querySource =
spark
@@ -391,11 +388,11 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
}
}
- @Test
+ @TestTemplate
public void testFailReadingCheckpointInvalidSnapshot() throws IOException,
TimeoutException {
- File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+ File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
- File output = temp.newFolder();
+ File output = temp.resolve("junit").toFile();
DataStreamWriter querySource =
spark
@@ -431,7 +428,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
firstSnapshotid));
}
- @Test
+ @TestTemplate
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords =
Lists.newArrayList(
@@ -453,14 +450,14 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
Iterables.concat(parquetFileRecords, orcFileRecords,
avroFileRecords));
}
- @Test
+ @TestTemplate
public void testReadStreamFromEmptyTable() throws Exception {
StreamingQuery stream = startStream();
List<SimpleRecord> actual = rowsAvailable(stream);
- Assert.assertEquals(Collections.emptyList(), actual);
+ assertThat(actual).isEmpty();
}
- @Test
+ @TestTemplate
public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws
Exception {
// upgrade table to version 2 - to facilitate creation of Snapshot of type
OVERWRITE.
TableOperations ops = ((BaseTable) table).operations();
@@ -481,14 +478,14 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
- Files.localOutput(temp.newFile()),
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
TestHelpers.Row.of(0),
dataDeletes,
deleteRowSchema);
DataFile dataFile =
DataFiles.builder(table.spec())
- .withPath(temp.newFile().toString())
+ .withPath(File.createTempFile("junit", null,
temp.toFile()).getPath())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
@@ -498,7 +495,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
// check pre-condition - that the above Delete file write - actually
resulted in snapshot of
// type OVERWRITE
- Assert.assertEquals(DataOperations.OVERWRITE,
table.currentSnapshot().operation());
+
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE);
StreamingQuery query = startStream();
@@ -508,7 +505,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
.hasMessageStartingWith("Cannot process overwrite snapshot");
}
- @Test
+ @TestTemplate
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws
Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -518,14 +515,14 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
table.rewriteManifests().clusterBy(f -> 1).commit();
// check pre-condition
- Assert.assertEquals(DataOperations.REPLACE,
table.currentSnapshot().operation());
+
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.REPLACE);
StreamingQuery query = startStream();
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @Test
+ @TestTemplate
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception
{
table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
@@ -538,7 +535,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// DELETE.
- Assert.assertEquals(DataOperations.DELETE,
table.currentSnapshot().operation());
+
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE);
StreamingQuery query = startStream();
@@ -548,7 +545,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
.hasMessageStartingWith("Cannot process delete snapshot");
}
- @Test
+ @TestTemplate
public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws
Exception {
table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
@@ -561,14 +558,14 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// DELETE.
- Assert.assertEquals(DataOperations.DELETE,
table.currentSnapshot().operation());
+
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE);
StreamingQuery query =
startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}
- @Test
+ @TestTemplate
public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption()
throws Exception {
table.updateSpec().removeField("id_bucket").addField(ref("id")).commit();
@@ -578,7 +575,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
DataFile dataFile =
DataFiles.builder(table.spec())
- .withPath(temp.newFile().toString())
+ .withPath(File.createTempFile("junit", null,
temp.toFile()).getPath())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
@@ -593,7 +590,7 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
// check pre-condition - that the above delete operation on table resulted
in Snapshot of Type
// OVERWRITE.
- Assert.assertEquals(DataOperations.OVERWRITE,
table.currentSnapshot().operation());
+
assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE);
StreamingQuery query =
startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true");
assertThat(rowsAvailable(query))
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 55fd2cefe2..5dbfc7fa6c 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -31,7 +32,9 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
@AfterEach
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c031f2991f..fd155a6bca 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 6ce2ce6238..e444b7cb1f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkStagedScan extends CatalogTestBase {
@AfterEach
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 46ee484b39..d14b1a52cf 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -28,7 +29,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkTable extends CatalogTestBase {
@BeforeEach
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 55fd2cefe2..5dbfc7fa6c 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -31,7 +32,9 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRequiredDistributionAndOrdering extends CatalogTestBase {
@AfterEach
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
index c031f2991f..fd155a6bca 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.KryoHelpers;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkCatalogHadoopOverrides extends CatalogTestBase {
private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir";
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
index 6ce2ce6238..e444b7cb1f 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -35,7 +36,9 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkStagedScan extends CatalogTestBase {
@AfterEach
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index 46ee484b39..d14b1a52cf 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.CatalogManager;
@@ -28,7 +29,9 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestSparkTable extends CatalogTestBase {
@BeforeEach