This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2c157cbaf5 Spark 3.4: Migrate TestBase-related remaining tests in
actions (#12579)
2c157cbaf5 is described below
commit 2c157cbaf552f48a18d959f0c6b95be36b9dd9e3
Author: Tom Tanaka <[email protected]>
AuthorDate: Fri Mar 21 18:50:50 2025 +0900
Spark 3.4: Migrate TestBase-related remaining tests in actions (#12579)
---
.../spark/actions/TestRemoveOrphanFilesAction.java | 268 +++++++-------
.../actions/TestRemoveOrphanFilesAction3.java | 107 +++---
.../spark/actions/TestRewriteDataFilesAction.java | 387 +++++++++++----------
.../spark/actions/TestRewriteManifestsAction.java | 228 ++++++------
.../spark/actions/TestRemoveOrphanFilesAction.java | 26 +-
.../actions/TestRemoveOrphanFilesAction3.java | 20 +-
.../spark/actions/TestRewriteDataFilesAction.java | 15 +-
.../spark/actions/TestRewriteManifestsAction.java | 1 -
8 files changed, 533 insertions(+), 519 deletions(-)
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index d322f1d67b..8788940b09 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -32,12 +33,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +46,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -64,12 +68,10 @@ import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSQLProperties;
-import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.TestBase;
import
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -80,13 +82,13 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
-public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public abstract class TestRemoveOrphanFilesAction extends TestBase {
private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
protected static final Schema SCHEMA =
@@ -97,17 +99,23 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
protected static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).truncate("c2",
2).identity("c3").build();
- @Rule public TemporaryFolder temp = new TemporaryFolder();
- private File tableDir = null;
+ @TempDir private File tableDir = null;
protected String tableLocation = null;
+ protected Map<String, String> properties;
+ @Parameter private int formatVersion;
- @Before
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Object> parameters() {
+ return Arrays.asList(2, 3);
+ }
+
+ @BeforeEach
public void setupTableLocation() throws Exception {
- this.tableDir = temp.newFolder();
this.tableLocation = tableDir.toURI().toString();
+ properties = ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
}
- @Test
+ @TestTemplate
public void testDryRun() throws IOException, InterruptedException {
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
Maps.newHashMap(), tableLocation);
@@ -129,7 +137,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
- Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());
+ assertThat(validFiles).as("Should be 2 valid files").hasSize(2);
df.write().mode("append").parquet(tableLocation + "/data");
@@ -140,11 +148,11 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.filter(FileStatus::isFile)
.map(file -> file.getPath().toString())
.collect(Collectors.toList());
- Assert.assertEquals("Should be 3 files", 3, allFiles.size());
+ assertThat(allFiles).as("Should be 3 valid files").hasSize(3);
List<String> invalidFiles = Lists.newArrayList(allFiles);
invalidFiles.removeAll(validFiles);
- Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+ assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
waitUntilAfter(System.currentTimeMillis());
@@ -152,9 +160,9 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result1 =
actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
- Assert.assertTrue(
- "Default olderThan interval should be safe",
- Iterables.isEmpty(result1.orphanFileLocations()));
+ assertThat(result1.orphanFileLocations())
+ .as("Default olderThan interval should be safe")
+ .isEmpty();
DeleteOrphanFiles.Result result2 =
actions
@@ -162,14 +170,21 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
- Assert.assertEquals("Action should find 1 file", invalidFiles,
result2.orphanFileLocations());
- Assert.assertTrue("Invalid file should be present", fs.exists(new
Path(invalidFiles.get(0))));
+ assertThat(result2.orphanFileLocations())
+ .as("Action should find 1 file")
+ .isEqualTo(invalidFiles);
+ assertThat(fs.exists(new Path(invalidFiles.get(0))))
+ .as("Invalid file should be present")
+ .isTrue();
DeleteOrphanFiles.Result result3 =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Action should delete 1 file", invalidFiles,
result3.orphanFileLocations());
- Assert.assertFalse(
- "Invalid file should not be present", fs.exists(new
Path(invalidFiles.get(0))));
+ assertThat(result3.orphanFileLocations())
+ .as("Action should delete 1 file")
+ .isEqualTo(invalidFiles);
+ assertThat(fs.exists(new Path(invalidFiles.get(0))))
+ .as("Invalid file should not be present")
+ .isFalse();
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -178,10 +193,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testAllValidFilesAreKept() throws IOException,
InterruptedException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
@@ -205,13 +220,13 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
List<String> snapshotFiles1 = snapshotFiles(snapshots.get(0).snapshotId());
- Assert.assertEquals(1, snapshotFiles1.size());
+ assertThat(snapshotFiles1).hasSize(1);
List<String> snapshotFiles2 = snapshotFiles(snapshots.get(1).snapshotId());
- Assert.assertEquals(1, snapshotFiles2.size());
+ assertThat(snapshotFiles2).hasSize(1);
List<String> snapshotFiles3 = snapshotFiles(snapshots.get(2).snapshotId());
- Assert.assertEquals(2, snapshotFiles3.size());
+ assertThat(snapshotFiles3).hasSize(2);
df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
df2.coalesce(1).write().mode("append").parquet(tableLocation +
"/data/c2_trunc=AA");
@@ -225,25 +240,25 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 4 files", 4,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete 4
files").hasSize(4);
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs =
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
for (String fileLocation : snapshotFiles1) {
- Assert.assertTrue("All snapshot files must remain", fs.exists(new
Path(fileLocation)));
+ assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files
must remain").isTrue();
}
for (String fileLocation : snapshotFiles2) {
- Assert.assertTrue("All snapshot files must remain", fs.exists(new
Path(fileLocation)));
+ assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files
must remain").isTrue();
}
for (String fileLocation : snapshotFiles3) {
- Assert.assertTrue("All snapshot files must remain", fs.exists(new
Path(fileLocation)));
+ assertThat(fs.exists(new Path(fileLocation))).as("All snapshot files
must remain").isTrue();
}
}
- @Test
+ @TestTemplate
public void orphanedFileRemovedWithParallelTasks() throws
InterruptedException, IOException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
@@ -299,16 +314,15 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
// Verifies that the delete methods ran in the threads created by the
provided ExecutorService
// ThreadFactory
- Assert.assertEquals(
- deleteThreads,
- Sets.newHashSet(
- "remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3"));
-
- Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+ assertThat(deleteThreads)
+ .containsExactlyInAnyOrder(
+ "remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3");
+ assertThat(deletedFiles).hasSize(4);
}
- @Test
+ @TestTemplate
public void testWapFilesAreKept() throws InterruptedException {
+ assumeThat(formatVersion).as("currently fails with DVs").isEqualTo(2);
Map<String, String> props = Maps.newHashMap();
props.put(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true");
Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation);
@@ -328,7 +342,9 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Should not return data from the staged snapshot",
records, actualRecords);
+ assertThat(actualRecords)
+ .as("Should not return data from the staged snapshot")
+ .isEqualTo(records);
waitUntilAfter(System.currentTimeMillis());
@@ -337,11 +353,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertTrue(
- "Should not delete any files",
Iterables.isEmpty(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
}
- @Test
+ @TestTemplate
public void testMetadataFolderIsIntact() throws InterruptedException {
// write data directly to the table location
Map<String, String> props = Maps.newHashMap();
@@ -363,15 +378,15 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 1 file", 1,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete 1
file").hasSize(1);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", records, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(records);
}
- @Test
+ @TestTemplate
public void testOlderThanTimestamp() throws InterruptedException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
@@ -397,11 +412,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(timestamp).execute();
- Assert.assertEquals(
- "Should delete only 2 files", 2,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete only 2
files").hasSize(2);
}
- @Test
+ @TestTemplate
public void testRemoveUnreachableMetadataVersionFiles() throws
InterruptedException {
Map<String, String> props = Maps.newHashMap();
props.put(TableProperties.WRITE_DATA_LOCATION, tableLocation);
@@ -423,11 +437,8 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 1 file", 1,
Iterables.size(result.orphanFileLocations()));
- Assert.assertTrue(
- "Should remove v1 file",
- StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
- .anyMatch(file -> file.contains("v1.metadata.json")));
+ assertThat(result.orphanFileLocations())
+ .containsExactly(tableLocation + "metadata/v1.metadata.json");
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -436,10 +447,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testManyTopLevelPartitions() throws InterruptedException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
@@ -459,14 +470,13 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertTrue(
- "Should not delete any files",
Iterables.isEmpty(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
- Assert.assertEquals("Rows count must match", records.size(),
resultDF.count());
+ assertThat(resultDF.count()).as("Rows count must
match").isEqualTo(records.size());
}
- @Test
+ @TestTemplate
public void testManyLeafPartitions() throws InterruptedException {
Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(),
tableLocation);
@@ -486,14 +496,13 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertTrue(
- "Should not delete any files",
Iterables.isEmpty(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
- Assert.assertEquals("Row count must match", records.size(),
resultDF.count());
+ assertThat(resultDF.count()).as("Row count must
match").isEqualTo(records.size());
}
- @Test
+ @TestTemplate
public void testHiddenPartitionPaths() throws InterruptedException {
Schema schema =
new Schema(
@@ -523,10 +532,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 2 files", 2,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
}
- @Test
+ @TestTemplate
public void testHiddenPartitionPathsWithPartitionEvolution() throws
InterruptedException {
Schema schema =
new Schema(
@@ -559,10 +568,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 2 files", 2,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
}
- @Test
+ @TestTemplate
public void testHiddenPathsStartingWithPartitionNamesAreIgnored()
throws InterruptedException, IOException {
Schema schema =
@@ -595,8 +604,8 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals("Should delete 0 files", 0,
Iterables.size(result.orphanFileLocations()));
- Assert.assertTrue(fs.exists(pathToFileInHiddenFolder));
+ assertThat(result.orphanFileLocations()).as("Should delete 0
files").isEmpty();
+ assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
}
private List<String> snapshotFiles(long snapshotId) {
@@ -610,7 +619,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.collectAsList();
}
- @Test
+ @TestTemplate
public void testRemoveOrphanFilesWithRelativeFilePath() throws IOException,
InterruptedException {
Table table =
TABLES.create(
@@ -635,7 +644,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
- Assert.assertEquals("Should be 1 valid files", 1, validFiles.size());
+ assertThat(validFiles).as("Should be 1 valid file").hasSize(1);
String validFile = validFiles.get(0);
df.write().mode("append").parquet(tableLocation + "/data");
@@ -647,11 +656,11 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.filter(FileStatus::isFile)
.map(file -> file.getPath().toString())
.collect(Collectors.toList());
- Assert.assertEquals("Should be 2 files", 2, allFiles.size());
+ assertThat(allFiles).as("Should be 2 files").hasSize(2);
List<String> invalidFiles = Lists.newArrayList(allFiles);
invalidFiles.removeIf(file -> file.contains(validFile));
- Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+ assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
waitUntilAfter(System.currentTimeMillis());
@@ -662,11 +671,15 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
- Assert.assertEquals("Action should find 1 file", invalidFiles,
result.orphanFileLocations());
- Assert.assertTrue("Invalid file should be present", fs.exists(new
Path(invalidFiles.get(0))));
+ assertThat(result.orphanFileLocations())
+ .as("Action should find 1 file")
+ .isEqualTo(invalidFiles);
+ assertThat(fs.exists(new Path(invalidFiles.get(0))))
+ .as("Invalid file should be present")
+ .isTrue();
}
- @Test
+ @TestTemplate
public void testRemoveOrphanFilesWithHadoopCatalog() throws
InterruptedException {
HadoopCatalog catalog = new HadoopCatalog(new Configuration(),
tableLocation);
String namespaceName = "testDb";
@@ -693,24 +706,18 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.Result result =
SparkActions.get().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- Assert.assertEquals(
- "Should delete only 1 files", 1,
Iterables.size(result.orphanFileLocations()));
+ assertThat(result.orphanFileLocations()).as("Should delete only 1
file").hasSize(1);
Dataset<Row> resultDF =
spark.read().format("iceberg").load(table.location());
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", records, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(records);
}
- @Test
+ @TestTemplate
public void testHiveCatalogTable() throws IOException {
- Table table =
- catalog.createTable(
- TableIdentifier.of("default", "hivetestorphan"),
- SCHEMA,
- SPEC,
- tableLocation,
- Maps.newHashMap());
+ TableIdentifier identifier = TableIdentifier.of("default",
randomName("hivetestorphan"));
+ Table table = catalog.createTable(identifier, SCHEMA, SPEC, tableLocation,
properties);
List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
@@ -721,7 +728,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.write()
.format("iceberg")
.mode("append")
- .save("default.hivetestorphan");
+ .save(identifier.toString());
String location = table.location().replaceFirst("file:", "");
new File(location + "/data/trashfile").createNewFile();
@@ -731,13 +738,12 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(result.orphanFileLocations().spliterator(), false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(result.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + "/data/trashfile");
}
- @Test
+ @TestTemplate
public void testGarbageCollectionDisabled() {
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
Maps.newHashMap(), tableLocation);
@@ -757,7 +763,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
"Cannot delete orphan files: GC is disabled (deleting files may
corrupt other tables)");
}
- @Test
+ @TestTemplate
public void testCompareToFileList() throws IOException, InterruptedException
{
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(),
Maps.newHashMap(), tableLocation);
@@ -782,7 +788,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
file.getPath().toString(), new
Timestamp(file.getModificationTime())))
.collect(Collectors.toList());
- Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());
+ assertThat(validFiles).as("Should be 2 valid files").hasSize(2);
df.write().mode("append").parquet(tableLocation + "/data");
@@ -795,7 +801,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
file.getPath().toString(), new
Timestamp(file.getModificationTime())))
.collect(Collectors.toList());
- Assert.assertEquals("Should be 3 files", 3, allFiles.size());
+ assertThat(allFiles).as("Should be 3 files").hasSize(3);
List<FilePathLastModifiedRecord> invalidFiles =
Lists.newArrayList(allFiles);
invalidFiles.removeAll(validFiles);
@@ -803,7 +809,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
invalidFiles.stream()
.map(FilePathLastModifiedRecord::getFilePath)
.collect(Collectors.toList());
- Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());
+ assertThat(invalidFiles).as("Should be 1 invalid file").hasSize(1);
// sleep for 1 second to ensure files will be old enough
waitUntilAfter(System.currentTimeMillis());
@@ -822,9 +828,9 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.compareToFileList(compareToFileList)
.deleteWith(s -> {})
.execute();
- Assert.assertTrue(
- "Default olderThan interval should be safe",
- Iterables.isEmpty(result1.orphanFileLocations()));
+ assertThat(result1.orphanFileLocations())
+ .as("Default olderThan interval should be safe")
+ .isEmpty();
DeleteOrphanFiles.Result result2 =
actions
@@ -833,10 +839,12 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.olderThan(System.currentTimeMillis())
.deleteWith(s -> {})
.execute();
- Assert.assertEquals(
- "Action should find 1 file", invalidFilePaths,
result2.orphanFileLocations());
- Assert.assertTrue(
- "Invalid file should be present", fs.exists(new
Path(invalidFilePaths.get(0))));
+ assertThat(result2.orphanFileLocations())
+ .as("Action should find 1 file")
+ .isEqualTo(invalidFilePaths);
+ assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
+ .as("Invalid file should be present")
+ .isTrue();
DeleteOrphanFiles.Result result3 =
actions
@@ -844,10 +852,12 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
- Assert.assertEquals(
- "Action should delete 1 file", invalidFilePaths,
result3.orphanFileLocations());
- Assert.assertFalse(
- "Invalid file should not be present", fs.exists(new
Path(invalidFilePaths.get(0))));
+ assertThat(result3.orphanFileLocations())
+ .as("Action should delete 1 file")
+ .isEqualTo(invalidFilePaths);
+ assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
+ .as("Invalid file should not be present")
+ .isFalse();
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -856,7 +866,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
List<FilePathLastModifiedRecord> outsideLocationMockFiles =
Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new
Timestamp(0L)));
@@ -873,8 +883,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> {})
.execute();
- Assert.assertEquals(
- "Action should find nothing", Lists.newArrayList(),
result4.orphanFileLocations());
+ assertThat(result4.orphanFileLocations()).as("Action should find
nothing").isEmpty();
}
protected long waitUntilAfter(long timestampMillis) {
@@ -885,7 +894,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
return current;
}
- @Test
+ @TestTemplate
public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
Table table =
TABLES.create(
@@ -954,35 +963,32 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
.olderThan(System.currentTimeMillis() + 1000)
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
- assertThat(orphanFileLocations).as("Should be orphan files").hasSize(1);
- assertThat(Iterables.getOnlyElement(orphanFileLocations))
- .as("Deleted file")
- .isEqualTo(statsLocation.toURI().toString());
- assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
+
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+ assertThat(statsLocation).as("stats file should be
deleted").doesNotExist();
}
- @Test
+ @TestTemplate
public void testPathsWithExtraSlashes() {
List<String> validFiles = Lists.newArrayList("file:///dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("file:///dir1/////dir2///file1");
executeTest(validFiles, actualFiles, Lists.newArrayList());
}
- @Test
+ @TestTemplate
public void testPathsWithValidFileHavingNoAuthority() {
List<String> validFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
executeTest(validFiles, actualFiles, Lists.newArrayList());
}
- @Test
+ @TestTemplate
public void testPathsWithActualFileHavingNoAuthority() {
List<String> validFiles =
Lists.newArrayList("hdfs://servicename/dir1/dir2/file1");
List<String> actualFiles = Lists.newArrayList("hdfs:///dir1/dir2/file1");
executeTest(validFiles, actualFiles, Lists.newArrayList());
}
- @Test
+ @TestTemplate
public void testPathsWithEqualSchemes() {
List<String> validFiles =
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
@@ -1011,7 +1017,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.PrefixMismatchMode.ERROR);
}
- @Test
+ @TestTemplate
public void testPathsWithEqualAuthorities() {
List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
@@ -1040,7 +1046,7 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
DeleteOrphanFiles.PrefixMismatchMode.ERROR);
}
- @Test
+ @TestTemplate
public void testRemoveOrphanFileActionWithDeleteMode() {
List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
@@ -1054,6 +1060,10 @@ public abstract class TestRemoveOrphanFilesAction
extends SparkTestBase {
DeleteOrphanFiles.PrefixMismatchMode.DELETE);
}
+ protected String randomName(String prefix) {
+ return prefix + UUID.randomUUID().toString().replace("-", "");
+ }
+
private void executeTest(
List<String> validFiles, List<String> actualFiles, List<String>
expectedOrphanFiles) {
executeTest(
@@ -1081,6 +1091,6 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
List<String> orphanFiles =
DeleteOrphanFilesSparkAction.findOrphanFiles(
spark, toFileUri.apply(actualFileDS),
toFileUri.apply(validFileDS), mode);
- Assert.assertEquals(expectedOrphanFiles, orphanFiles);
+ assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 0abfd79d5d..646e5f8e70 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -18,23 +18,21 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.assertj.core.api.Assertions.assertThat;
+
import java.io.File;
-import java.util.Map;
-import java.util.stream.StreamSupport;
import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.expressions.Transform;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
- @Test
+ @TestTemplate
public void testSparkCatalogTable() throws Exception {
spark.conf().set("spark.sql.catalog.mycat",
"org.apache.iceberg.spark.SparkCatalog");
spark.conf().set("spark.sql.catalog.mycat.type", "hadoop");
@@ -42,29 +40,28 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
SparkCatalog cat = (SparkCatalog)
spark.sessionState().catalogManager().catalog("mycat");
String[] database = {"default"};
- Identifier id = Identifier.of(database, "table");
- Map<String, String> options = Maps.newHashMap();
+ Identifier id = Identifier.of(database, randomName("table"));
Transform[] transforms = {};
- cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+ cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO mycat.default.table VALUES (1,1,1)");
+ sql("INSERT INTO mycat.default.%s VALUES (1,1,1)", id.name());
String location = table.table().location().replaceFirst("file:", "");
- new File(location + "/data/trashfile").createNewFile();
+ String trashFile = randomName("/data/trashfile");
+ new File(location + trashFile).createNewFile();
DeleteOrphanFiles.Result results =
SparkActions.get()
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(results.orphanFileLocations().spliterator(),
false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(results.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + trashFile);
}
- @Test
+ @TestTemplate
public void testSparkCatalogNamedHadoopTable() throws Exception {
spark.conf().set("spark.sql.catalog.hadoop",
"org.apache.iceberg.spark.SparkCatalog");
spark.conf().set("spark.sql.catalog.hadoop.type", "hadoop");
@@ -72,29 +69,28 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
SparkCatalog cat = (SparkCatalog)
spark.sessionState().catalogManager().catalog("hadoop");
String[] database = {"default"};
- Identifier id = Identifier.of(database, "table");
- Map<String, String> options = Maps.newHashMap();
+ Identifier id = Identifier.of(database, randomName("table"));
Transform[] transforms = {};
- cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+ cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO hadoop.default.table VALUES (1,1,1)");
+ sql("INSERT INTO hadoop.default.%s VALUES (1,1,1)", id.name());
String location = table.table().location().replaceFirst("file:", "");
- new File(location + "/data/trashfile").createNewFile();
+ String trashFile = randomName("/data/trashfile");
+ new File(location + trashFile).createNewFile();
DeleteOrphanFiles.Result results =
SparkActions.get()
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(results.orphanFileLocations().spliterator(),
false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(results.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + trashFile);
}
- @Test
+ @TestTemplate
public void testSparkCatalogNamedHiveTable() throws Exception {
spark.conf().set("spark.sql.catalog.hive",
"org.apache.iceberg.spark.SparkCatalog");
spark.conf().set("spark.sql.catalog.hive.type", "hadoop");
@@ -102,29 +98,28 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
SparkCatalog cat = (SparkCatalog)
spark.sessionState().catalogManager().catalog("hive");
String[] database = {"default"};
- Identifier id = Identifier.of(database, "table");
- Map<String, String> options = Maps.newHashMap();
+ Identifier id = Identifier.of(database, randomName("table"));
Transform[] transforms = {};
- cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+ cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO hive.default.table VALUES (1,1,1)");
+ sql("INSERT INTO hive.default.%s VALUES (1,1,1)", id.name());
String location = table.table().location().replaceFirst("file:", "");
- new File(location + "/data/trashfile").createNewFile();
+ String trashFile = randomName("/data/trashfile");
+ new File(location + trashFile).createNewFile();
DeleteOrphanFiles.Result results =
SparkActions.get()
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(results.orphanFileLocations().spliterator(),
false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(results.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + trashFile);
}
- @Test
+ @TestTemplate
public void testSparkSessionCatalogHadoopTable() throws Exception {
spark
.conf()
@@ -135,29 +130,28 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
(SparkSessionCatalog)
spark.sessionState().catalogManager().v2SessionCatalog();
String[] database = {"default"};
- Identifier id = Identifier.of(database, "table");
- Map<String, String> options = Maps.newHashMap();
+ Identifier id = Identifier.of(database, randomName("table"));
Transform[] transforms = {};
- cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+ cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO default.table VALUES (1,1,1)");
+ sql("INSERT INTO default.%s VALUES (1,1,1)", id.name());
String location = table.table().location().replaceFirst("file:", "");
- new File(location + "/data/trashfile").createNewFile();
+ String trashFile = randomName("/data/trashfile");
+ new File(location + trashFile).createNewFile();
DeleteOrphanFiles.Result results =
SparkActions.get()
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(results.orphanFileLocations().spliterator(),
false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(results.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + trashFile);
}
- @Test
+ @TestTemplate
public void testSparkSessionCatalogHiveTable() throws Exception {
spark
.conf()
@@ -168,30 +162,29 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
String[] database = {"default"};
Identifier id = Identifier.of(database, "sessioncattest");
- Map<String, String> options = Maps.newHashMap();
Transform[] transforms = {};
cat.dropTable(id);
- cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
+ cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
+ sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
String location = table.table().location().replaceFirst("file:", "");
- new File(location + "/data/trashfile").createNewFile();
+ String trashFile = randomName("/data/trashfile");
+ new File(location + trashFile).createNewFile();
DeleteOrphanFiles.Result results =
SparkActions.get()
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- Assert.assertTrue(
- "trash file should be removed",
- StreamSupport.stream(results.orphanFileLocations().spliterator(),
false)
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile")));
+ assertThat(results.orphanFileLocations())
+ .as("trash file should be removed")
+ .contains("file:" + location + trashFile);
}
- @After
- public void resetSparkSessionCatalog() throws Exception {
+ @AfterEach
+ public void resetSparkSessionCatalog() {
spark.conf().unset("spark.sql.catalog.spark_catalog");
spark.conf().unset("spark.sql.catalog.spark_catalog.type");
spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 490c711930..43e5cb37da 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -38,6 +38,7 @@ import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -55,6 +56,9 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
@@ -90,7 +94,6 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -100,8 +103,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBase;
import
org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -116,17 +119,18 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
-public class TestRewriteDataFilesAction extends SparkTestBase {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRewriteDataFilesAction extends TestBase {
+ @TempDir private File tableDir;
private static final int SCALE = 400000;
private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
@@ -138,21 +142,25 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ @Parameter private int formatVersion;
+
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Object> parameters() {
+ return Arrays.asList(2, 3);
+ }
private final FileRewriteCoordinator coordinator =
FileRewriteCoordinator.get();
private final ScanTaskSetManager manager = ScanTaskSetManager.get();
private String tableLocation = null;
- @BeforeClass
+ @BeforeAll
public static void setupSpark() {
// disable AQE as tests assume that writes generate a particular number of
files
spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
}
- @Before
- public void setupTableLocation() throws Exception {
- File tableDir = temp.newFolder();
+ @BeforeEach
+ public void setupTableLocation() {
this.tableLocation = tableDir.toURI().toString();
}
@@ -162,20 +170,20 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
return
actions().rewriteDataFiles(table).option(SizeBasedFileRewriter.MIN_INPUT_FILES,
"1");
}
- @Test
+ @TestTemplate
public void testEmptyTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
- Assert.assertNull("Table must be empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
basicRewrite(table).execute();
- Assert.assertNull("Table must stay empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must stay empty").isNull();
}
- @Test
+ @TestTemplate
public void testBinPackUnpartitionedTable() {
Table table = createTable(4);
shouldHaveFiles(table, 4);
@@ -183,8 +191,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).execute();
- Assert.assertEquals("Action should rewrite 4 data files", 4,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 4 data files")
+ .isEqualTo(4);
+ assertThat(result.addedDataFilesCount()).as("Action should add 1 data
file").isOne();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 1);
@@ -193,7 +203,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actual);
}
- @Test
+ @TestTemplate
public void testBinPackPartitionedTable() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
@@ -201,8 +211,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).execute();
- Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 8 data files")
+ .isEqualTo(8);
+ assertThat(result.addedDataFilesCount()).as("Action should add 4 data
file").isEqualTo(4);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
@@ -211,7 +223,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackWithFilter() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
@@ -224,8 +236,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.filter(Expressions.startsWith("c2", "foo"))
.execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 2 data files")
+ .isEqualTo(2);
+ assertThat(result.addedDataFilesCount()).as("Action should add 1 data
file").isOne();
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
shouldHaveFiles(table, 7);
@@ -234,7 +248,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackWithFilterOnBucketExpression() {
Table table = createTablePartitioned(4, 2);
@@ -260,7 +274,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackAfterPartitionChange() {
Table table = createTable();
@@ -282,10 +296,9 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Integer.toString(averageFileSize(table) + 1001))
.execute();
- Assert.assertEquals(
- "Should have 1 fileGroup because all files were not correctly
partitioned",
- 1,
- result.rewriteResults().size());
+ assertThat(result.rewriteResults())
+ .as("Should have 1 fileGroup because all files were not correctly
partitioned")
+ .hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
List<Object[]> postRewriteData = currentData();
@@ -296,7 +309,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 20);
}
- @Test
+ @TestTemplate
public void testBinPackWithDeletes() {
Table table = createTablePartitioned(4, 2);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
@@ -331,15 +344,15 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES,
Long.toString(Long.MAX_VALUE))
.option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "2")
.execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount()).isEqualTo(2);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
- Assert.assertEquals("7 rows are removed", total - 7, actualRecords.size());
+ assertThat(actualRecords).hasSize(total - 7);
}
- @Test
+ @TestTemplate
public void testRemoveDangledEqualityDeletesPartitionEvolution() {
Table table =
TABLES.create(
@@ -398,7 +411,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 5);
}
- @Test
+ @TestTemplate
public void testRemoveDangledPositionDeletesPartitionEvolution() {
Table table =
TABLES.create(
@@ -437,11 +450,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.containsExactly(1, 2, 1);
shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 ==
1", 3);
shouldHaveSnapshots(table, 5);
-
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+
assertThat(table.currentSnapshot().summary()).containsEntry("total-position-deletes",
"0");
assertEquals("Rows must match", expectedRecords, currentData());
}
- @Test
+ @TestTemplate
public void testBinPackWithDeleteAllData() {
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.FORMAT_VERSION, "2");
@@ -466,26 +479,25 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.rewriteDataFiles(table)
.option(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "1")
.execute();
- Assert.assertEquals("Action should rewrite 1 data files", 1,
result.rewrittenDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount()).as("Action should rewrite 1
data files").isOne();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
- Assert.assertEquals(
- "Data manifest should not have existing data file",
- 0,
- (long)
table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
- Assert.assertEquals(
- "Data manifest should have 1 delete data file",
- 1L,
- (long)
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
- Assert.assertEquals(
- "Delete manifest added row count should equal total count",
- total,
- (long)
table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
- }
-
- @Test
+
assertThat(table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount())
+ .as("Data manifest should not have existing data file")
+ .isZero();
+
+ assertThat((long)
table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount())
+ .as("Data manifest should have 1 delete data file")
+ .isEqualTo(1L);
+
+
assertThat(table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount())
+ .as("Delete manifest added row count should equal total count")
+ .isEqualTo(total);
+ }
+
+ @TestTemplate
public void testBinPackWithStartingSequenceNumber() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
@@ -497,8 +509,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Result result =
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER,
"true").execute();
- Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 8 data files")
+ .isEqualTo(8);
+ assertThat(result.addedDataFilesCount()).as("Action should add 4 data
files").isEqualTo(4);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
@@ -506,20 +520,21 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
table.refresh();
- Assert.assertTrue(
- "Table sequence number should be incremented",
- oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+ assertThat(table.currentSnapshot().sequenceNumber())
+ .as("Table sequence number should be incremented")
+ .isGreaterThan(oldSequenceNumber);
Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table,
MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
if (row.getInt(0) == 1) {
- Assert.assertEquals(
- "Expect old sequence number for added entries", oldSequenceNumber,
row.getLong(2));
+ assertThat(row.getLong(2))
+ .as("Expect old sequence number for added entries")
+ .isEqualTo(oldSequenceNumber);
}
}
}
- @Test
+ @TestTemplate
public void testBinPackWithStartingSequenceNumberV1Compatibility() {
Map<String, String> properties =
ImmutableMap.of(TableProperties.FORMAT_VERSION, "1");
Table table = createTablePartitioned(4, 2, SCALE, properties);
@@ -527,13 +542,15 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
List<Object[]> expectedRecords = currentData();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
- Assert.assertEquals("Table sequence number should be 0", 0,
oldSequenceNumber);
+ assertThat(oldSequenceNumber).as("Table sequence number should be
0").isZero();
long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER,
"true").execute();
- Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 8 data files")
+ .isEqualTo(8);
+ assertThat(result.addedDataFilesCount()).as("Action should add 4 data
files").isEqualTo(4);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
@@ -541,19 +558,19 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
table.refresh();
- Assert.assertEquals(
- "Table sequence number should still be 0",
- oldSequenceNumber,
- table.currentSnapshot().sequenceNumber());
+ assertThat(table.currentSnapshot().sequenceNumber())
+ .as("Table sequence number should still be 0")
+ .isEqualTo(oldSequenceNumber);
Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table,
MetadataTableType.ENTRIES);
for (Row row : rows.collectAsList()) {
- Assert.assertEquals(
- "Expect sequence number 0 for all entries", oldSequenceNumber,
row.getLong(2));
+ assertThat(row.getLong(2))
+ .as("Expect sequence number 0 for all entries")
+ .isEqualTo(oldSequenceNumber);
}
}
- @Test
+ @TestTemplate
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
Map<String, String> options = Maps.newHashMap();
@@ -575,15 +592,19 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
CloseableIterable<FileScanTask> tasks =
table.newScan().ignoreResiduals().filter(Expressions.equal("c3",
"0")).planFiles();
for (FileScanTask task : tasks) {
- Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), task.residual());
+ assertThat(task.residual())
+ .as("Residuals must be ignored")
+ .isEqualTo(Expressions.alwaysTrue());
}
shouldHaveFiles(table, 2);
long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).filter(Expressions.equal("c3",
"0")).execute();
- Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite 2 data files")
+ .isEqualTo(2);
+ assertThat(result.addedDataFilesCount()).as("Action should add 1 data
file").isOne();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
List<Object[]> actualRecords = currentData();
@@ -591,7 +612,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackSplitLargeFile() {
Table table = createTable(1);
shouldHaveFiles(table, 1);
@@ -606,8 +627,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES,
Long.toString(targetSize * 2 - 2000))
.execute();
- Assert.assertEquals("Action should delete 1 data files", 1,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 2 data files", 2,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount()).as("Action should delete 1
data files").isOne();
+ assertThat(result.addedDataFilesCount()).as("Action should add 2 data
files").isEqualTo(2);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 2);
@@ -616,7 +637,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackCombineMixedFiles() {
Table table = createTable(1); // 400000
shouldHaveFiles(table, 1);
@@ -638,10 +659,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES,
Integer.toString(targetSize - 1000))
.execute();
- Assert.assertEquals("Action should delete 3 data files", 3,
result.rewrittenDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should delete 3 data files")
+ .isEqualTo(3);
// Should Split the big files into 3 pieces, one of which should be
combined with the two
// smaller files
- Assert.assertEquals("Action should add 3 data files", 3,
result.addedDataFilesCount());
+ assertThat(result.addedDataFilesCount()).as("Action should add 3 data
files").isEqualTo(3);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 3);
@@ -650,7 +673,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testBinPackCombineMediumFiles() {
Table table = createTable(4);
shouldHaveFiles(table, 4);
@@ -671,8 +694,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Integer.toString(targetSize - 100)) // All files too small
.execute();
- Assert.assertEquals("Action should delete 4 data files", 4,
result.rewrittenDataFilesCount());
- Assert.assertEquals("Action should add 3 data files", 3,
result.addedDataFilesCount());
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should delete 4 data files")
+ .isEqualTo(4);
+ assertThat(result.addedDataFilesCount()).as("Action should add 3 data
files").isEqualTo(3);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 3);
@@ -681,7 +706,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
- @Test
+ @TestTemplate
public void testPartialProgressEnabled() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -700,7 +725,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "10")
.execute();
- Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewriteResults()).as("Should have 10
fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -712,7 +737,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
}
- @Test
+ @TestTemplate
public void testMultipleGroups() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -728,7 +753,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
.execute();
- Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewriteResults()).as("Should have 10
fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -740,7 +765,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testPartialProgressMaxCommits() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -757,7 +782,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "3")
.execute();
- Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewriteResults()).as("Should have 10
fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -769,7 +794,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testSingleCommitWithRewriteFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -803,7 +828,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testSingleCommitWithCommitFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -837,7 +862,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testCommitFailsWithUncleanableFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -871,7 +896,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -906,7 +931,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testPartialProgressWithRewriteFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -948,7 +973,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testParallelPartialProgressWithRewriteFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -991,7 +1016,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testParallelPartialProgressWithCommitFailure() {
Table table = createTable(20);
int fileSize = averageFileSize(table);
@@ -1021,8 +1046,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.Result result = spyRewrite.execute();
- // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10
total groups comitted
- Assert.assertEquals("Should have 6 fileGroups", 6,
result.rewriteResults().size());
+ // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10
total groups committed
+ assertThat(result.rewriteResults()).as("Should have 6
fileGroups").hasSize(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();
@@ -1036,7 +1061,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testInvalidOptions() {
Table table = createTable(20);
@@ -1080,7 +1105,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.hasMessageContaining("requires enabling Iceberg Spark session
extensions");
}
- @Test
+ @TestTemplate
public void testSortMultipleGroups() {
Table table = createTable(20);
shouldHaveFiles(table, 20);
@@ -1100,7 +1125,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
Integer.toString(fileSize * 2 + 1000))
.execute();
- Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewriteResults()).as("Should have 10
fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -1112,7 +1137,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testSimpleSort() {
Table table = createTable(20);
shouldHaveFiles(table, 20);
@@ -1131,7 +1156,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(averageFileSize(table)))
.execute();
- Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -1145,7 +1170,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitSorted(table, "c2");
}
- @Test
+ @TestTemplate
public void testSortAfterPartitionChange() {
Table table = createTable(20);
shouldHaveFiles(table, 20);
@@ -1165,10 +1190,9 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(averageFileSize(table)))
.execute();
- Assert.assertEquals(
- "Should have 1 fileGroup because all files were not correctly
partitioned",
- result.rewriteResults().size(),
- 1);
+ assertThat(result.rewriteResults())
+ .as("Should have 1 fileGroups because all files were not correctly
partitioned")
+ .hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -1182,7 +1206,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitSorted(table, "c2");
}
- @Test
+ @TestTemplate
public void testSortCustomSortOrder() {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
@@ -1199,7 +1223,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(averageFileSize(table)))
.execute();
- Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -1213,7 +1237,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitSorted(table, "c2");
}
- @Test
+ @TestTemplate
public void testSortCustomSortOrderRequiresRepartition() {
int partitions = 4;
Table table = createTable();
@@ -1238,7 +1262,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Integer.toString(averageFileSize(table) / partitions))
.execute();
- Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -1253,7 +1277,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitSorted(table, "c3");
}
- @Test
+ @TestTemplate
public void testAutoSortShuffleOutput() {
Table table = createTable(20);
shouldHaveLastCommitUnsorted(table, "c2");
@@ -1275,11 +1299,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
.execute();
- Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- Assert.assertTrue(
- "Should have written 40+ files",
- Iterables.size(table.currentSnapshot().addedDataFiles(table.io())) >=
40);
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
+ assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+ .as("Should have written 40+ files")
+ .hasSizeGreaterThanOrEqualTo(40);
table.refresh();
@@ -1292,7 +1317,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitSorted(table, "c2");
}
- @Test
+ @TestTemplate
public void testCommitStateUnknownException() {
Table table = createTable(20);
shouldHaveFiles(table, 20);
@@ -1324,7 +1349,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveSnapshots(table, 2); // Commit actually Succeeded
}
- @Test
+ @TestTemplate
public void testZOrderSort() {
int originalFiles = 20;
Table table = createTable(originalFiles);
@@ -1337,8 +1362,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
double originalFilesC2C3 =
percentFilesRequired(table, new String[] {"c2", "c3"}, new String[]
{"foo23", "bar23"});
- Assert.assertTrue("Should require all files to scan c2", originalFilesC2 >
0.99);
- Assert.assertTrue("Should require all files to scan c3", originalFilesC3 >
0.99);
+ assertThat(originalFilesC2).as("Should require all files to scan
c2").isGreaterThan(0.99);
+ assertThat(originalFilesC3).as("Should require all files to scan
c3").isGreaterThan(0.99);
long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
@@ -1354,10 +1379,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.MIN_INPUT_FILES, "1")
.execute();
- Assert.assertEquals("Should have 1 fileGroups", 1,
result.rewriteResults().size());
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- int zOrderedFilesTotal =
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
- Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >=
40);
+ assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+ .as("Should have written 40+ files")
+ .hasSizeGreaterThanOrEqualTo(40);
table.refresh();
@@ -1372,18 +1398,18 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
double filesScannedC2C3 =
percentFilesRequired(table, new String[] {"c2", "c3"}, new String[]
{"foo23", "bar23"});
- Assert.assertTrue(
- "Should have reduced the number of files required for c2",
- filesScannedC2 < originalFilesC2);
- Assert.assertTrue(
- "Should have reduced the number of files required for c3",
- filesScannedC3 < originalFilesC3);
- Assert.assertTrue(
- "Should have reduced the number of files required for a c2,c3
predicate",
- filesScannedC2C3 < originalFilesC2C3);
+ assertThat(originalFilesC2)
+ .as("Should have reduced the number of files required for c2")
+ .isGreaterThan(filesScannedC2);
+ assertThat(originalFilesC3)
+ .as("Should have reduced the number of files required for c3")
+ .isGreaterThan(filesScannedC3);
+ assertThat(originalFilesC2C3)
+ .as("Should have reduced the number of files required for c2,c3
predicate")
+ .isGreaterThan(filesScannedC2C3);
}
- @Test
+ @TestTemplate
public void testZOrderAllTypesSort() {
Table table = createTypeTestTable();
shouldHaveFiles(table, 10);
@@ -1410,10 +1436,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
- Assert.assertEquals("Should have 1 fileGroups", 1,
result.rewriteResults().size());
+ assertThat(result.rewriteResults()).as("Should have 1
fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
int zOrderedFilesTotal =
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
- Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal);
+ assertThat(table.currentSnapshot().addedDataFiles(table.io()))
+ .as("Should have written 1 file")
+ .hasSize(1);
table.refresh();
@@ -1426,7 +1454,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
- @Test
+ @TestTemplate
public void testInvalidAPIUsage() {
Table table = createTable(1);
@@ -1445,7 +1473,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.hasMessage("Must use only one rewriter type (bin-pack, sort,
zorder)");
}
- @Test
+ @TestTemplate
public void testRewriteJobOrderBytesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
@@ -1472,12 +1500,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.collect(Collectors.toList());
expected.sort(Comparator.naturalOrder());
- Assert.assertEquals("Size in bytes order should be ascending", actual,
expected);
+ assertThat(actual).as("Size in bytes order should be
ascending").isEqualTo(expected);
Collections.reverse(expected);
- Assert.assertNotEquals("Size in bytes order should not be descending",
actual, expected);
+ assertThat(actual).as("Size in bytes order should not be
descending").isNotEqualTo(expected);
}
- @Test
+ @TestTemplate
public void testRewriteJobOrderBytesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
@@ -1504,12 +1532,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.collect(Collectors.toList());
expected.sort(Comparator.reverseOrder());
- Assert.assertEquals("Size in bytes order should be descending", actual,
expected);
+ assertThat(actual).as("Size in bytes order should be
descending").isEqualTo(expected);
Collections.reverse(expected);
- Assert.assertNotEquals("Size in bytes order should not be ascending",
actual, expected);
+ assertThat(actual).as("Size in bytes order should not be
ascending").isNotEqualTo(expected);
}
- @Test
+ @TestTemplate
public void testRewriteJobOrderFilesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
@@ -1536,12 +1564,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.collect(Collectors.toList());
expected.sort(Comparator.naturalOrder());
- Assert.assertEquals("Number of files order should be ascending", actual,
expected);
+ assertThat(actual).as("Number of files order should be
ascending").isEqualTo(expected);
Collections.reverse(expected);
- Assert.assertNotEquals("Number of files order should not be descending",
actual, expected);
+ assertThat(actual).as("Number of files order should not be
descending").isNotEqualTo(expected);
}
- @Test
+ @TestTemplate
public void testRewriteJobOrderFilesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
@@ -1568,12 +1596,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.collect(Collectors.toList());
expected.sort(Comparator.reverseOrder());
- Assert.assertEquals("Number of files order should be descending", actual,
expected);
+ assertThat(actual).as("Number of files order should be
descending").isEqualTo(expected);
Collections.reverse(expected);
- Assert.assertNotEquals("Number of files order should not be ascending",
actual, expected);
+ assertThat(actual).as("Number of files order should not be
ascending").isNotEqualTo(expected);
}
- @Test
+ @TestTemplate
public void testBinPackRewriterWithSpecificUnparitionedOutputSpec() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
@@ -1591,11 +1619,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
- @Test
+ @TestTemplate
public void testBinPackRewriterWithSpecificOutputSpec() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
@@ -1614,11 +1642,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
- @Test
+ @TestTemplate
public void testBinpackRewriteWithInvalidOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
@@ -1634,7 +1662,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
"Cannot use output spec id 1234 because the table does not contain
a reference to this spec-id.");
}
- @Test
+ @TestTemplate
public void testSortRewriterWithSpecificOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
@@ -1653,11 +1681,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
- @Test
+ @TestTemplate
public void testZOrderRewriteWithSpecificOutputSpecId() {
Table table = createTable(10);
shouldHaveFiles(table, 10);
@@ -1676,7 +1704,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
@@ -1718,13 +1746,15 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
protected void shouldHaveMultipleFiles(Table table) {
table.refresh();
int numFiles = Iterables.size(table.newScan().planFiles());
- Assert.assertTrue(String.format("Should have multiple files, had %d",
numFiles), numFiles > 1);
+ assertThat(numFiles)
+ .as(String.format("Should have multiple files, had %d", numFiles))
+ .isGreaterThan(1);
}
protected void shouldHaveFiles(Table table, int numExpected) {
table.refresh();
int numFiles = Iterables.size(table.newScan().planFiles());
- Assert.assertEquals("Did not have the expected number of files",
numExpected, numFiles);
+ assertThat(numFiles).as("Did not have the expected number of
files").isEqualTo(numExpected);
}
protected long shouldHaveMinSequenceNumberInPartition(
@@ -1744,20 +1774,20 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
table.refresh();
- int actualSnapshots = Iterables.size(table.snapshots());
- Assert.assertEquals(
- "Table did not have the expected number of snapshots",
expectedSnapshots, actualSnapshots);
+ assertThat(table.snapshots())
+ .as("Table did not have the expected number of snapshots")
+ .hasSize(expectedSnapshots);
}
protected void shouldHaveNoOrphans(Table table) {
- Assert.assertEquals(
- "Should not have found any orphan files",
- ImmutableList.of(),
- actions()
- .deleteOrphanFiles(table)
- .olderThan(System.currentTimeMillis())
- .execute()
- .orphanFileLocations());
+ assertThat(
+ actions()
+ .deleteOrphanFiles(table)
+ .olderThan(System.currentTimeMillis())
+ .execute()
+ .orphanFileLocations())
+ .as("Should not have found any orphan files")
+ .isEmpty();
}
protected void shouldHaveOrphans(Table table) {
@@ -1772,20 +1802,19 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
}
protected void shouldHaveACleanCache(Table table) {
- Assert.assertEquals(
- "Should not have any entries in cache", ImmutableSet.of(),
cacheContents(table));
+ assertThat(cacheContents(table)).as("Should not have any entries in
cache").isEmpty();
}
protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles =
checkForOverlappingFiles(table, column);
- Assert.assertEquals("Found overlapping files", Collections.emptyList(),
overlappingFiles);
+ assertThat(overlappingFiles).as("Found overlapping files").isEmpty();
}
protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles =
checkForOverlappingFiles(table, column);
- Assert.assertNotEquals("Found no overlapping files",
Collections.emptyList(), overlappingFiles);
+ assertThat(overlappingFiles).as("Found no overlapping files").isNotEmpty();
}
private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T>
javaClass) {
@@ -1864,7 +1893,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.updateProperties()
.set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(20
* 1024))
.commit();
- Assert.assertNull("Table must be empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
return table;
}
@@ -1884,7 +1913,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int partitions, int files, int numRecords, Map<String, String> options) {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
- Assert.assertNull("Table must be empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
writeRecords(files, numRecords, partitions);
return table;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index bdc830e946..8a98a11704 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -46,6 +47,9 @@ import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
@@ -66,8 +70,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
@@ -76,17 +80,13 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.TableIdentifier;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class TestRewriteManifestsAction extends SparkTestBase {
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestRewriteManifestsAction extends TestBase {
private static final HadoopTables TABLES = new HadoopTables(new
Configuration());
private static final Schema SCHEMA =
@@ -98,36 +98,37 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
@Parameters(name = "snapshotIdInheritanceEnabled = {0}, useCaching = {1},
formatVersion = {2}")
public static Object[] parameters() {
return new Object[][] {
- new Object[] {"true", "true", 1},
- new Object[] {"false", "true", 1},
- new Object[] {"true", "false", 2},
- new Object[] {"false", "false", 2}
+ new Object[] {"true", "true", false, 1},
+ new Object[] {"false", "true", true, 1},
+ new Object[] {"true", "false", false, 2},
+ new Object[] {"false", "false", false, 2},
+ new Object[] {"true", "false", false, 3},
+ new Object[] {"false", "false", false, 3}
};
}
- @Rule public TemporaryFolder temp = new TemporaryFolder();
+ @Parameter private String snapshotIdInheritanceEnabled;
+
+ @Parameter(index = 1)
+ private String useCaching;
+
+ @Parameter(index = 2)
+ private boolean shouldStageManifests;
+
+ @Parameter(index = 3)
+ private int formatVersion;
- private final String snapshotIdInheritanceEnabled;
- private final String useCaching;
- private final int formatVersion;
- private final boolean shouldStageManifests;
private String tableLocation = null;
- public TestRewriteManifestsAction(
- String snapshotIdInheritanceEnabled, String useCaching, int
formatVersion) {
- this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
- this.useCaching = useCaching;
- this.formatVersion = formatVersion;
- this.shouldStageManifests = formatVersion == 1 &&
snapshotIdInheritanceEnabled.equals("false");
- }
+ @TempDir private Path temp;
+ @TempDir private File tableDir;
- @Before
+ @BeforeEach
public void setupTableLocation() throws Exception {
- File tableDir = temp.newFolder();
this.tableLocation = tableDir.toURI().toString();
}
- @Test
+ @TestTemplate
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
@@ -135,7 +136,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
- Assert.assertNull("Table must be empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
SparkActions actions = SparkActions.get();
@@ -143,13 +144,13 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.rewriteManifests(table)
.rewriteIf(manifest -> true)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
- .stagingLocation(temp.newFolder().toString())
+ .stagingLocation(java.nio.file.Files.createTempDirectory(temp,
"junit").toString())
.execute();
- Assert.assertNull("Table must stay empty", table.currentSnapshot());
+ assertThat(table.currentSnapshot()).as("Table must stay empty").isNull();
}
- @Test
+ @TestTemplate
public void testRewriteSmallManifestsNonPartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
@@ -171,7 +172,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
+ assertThat(manifests).as("Should have 2 manifests before
rewrite").hasSize(2);
SparkActions actions = SparkActions.get();
@@ -182,20 +183,18 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
- Assert.assertEquals(
- "Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
- Assert.assertEquals(
- "Action should add 1 manifests", 1,
Iterables.size(result.addedManifests()));
+ assertThat(result.rewrittenManifests()).as("Action should rewrite 2
manifests").hasSize(2);
+ assertThat(result.addedManifests()).as("Action should add 1
manifests").hasSize(1);
assertManifestsLocation(result.addedManifests());
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
+ assertThat(newManifests).as("Should have 1 manifests after
rewrite").hasSize(1);
- Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
- Assert.assertFalse(newManifests.get(0).hasAddedFiles());
- Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+ assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+ assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+ assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
@@ -205,10 +204,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteManifestsWithCommitStateUnknownException() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
@@ -230,7 +229,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
+ assertThat(manifests).as("Should have 2 manifests before
rewrite").hasSize(2);
SparkActions actions = SparkActions.get();
@@ -258,11 +257,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
// table should reflect the changes, since the commit was successful
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
+ assertThat(newManifests).as("Should have 1 manifests after
rewrite").hasSize(1);
- Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
- Assert.assertFalse(newManifests.get(0).hasAddedFiles());
- Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+ assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+ assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+ assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
@@ -272,10 +271,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteSmallManifestsPartitionedTable() {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
@@ -309,7 +308,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 4 manifests before rewrite", 4,
manifests.size());
+ assertThat(manifests).as("Should have 4 manifests before
rewrite").hasSize(4);
SparkActions actions = SparkActions.get();
@@ -329,24 +328,22 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
- Assert.assertEquals(
- "Action should rewrite 4 manifests", 4,
Iterables.size(result.rewrittenManifests()));
- Assert.assertEquals(
- "Action should add 2 manifests", 2,
Iterables.size(result.addedManifests()));
+ assertThat(result.rewrittenManifests()).as("Action should rewrite 4
manifests").hasSize(4);
+ assertThat(result.addedManifests()).as("Action should add 2
manifests").hasSize(2);
assertManifestsLocation(result.addedManifests());
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
+ assertThat(newManifests).as("Should have 2 manifests after
rewrite").hasSize(2);
- Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
- Assert.assertFalse(newManifests.get(0).hasAddedFiles());
- Assert.assertFalse(newManifests.get(0).hasDeletedFiles());
+ assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);
+ assertThat(newManifests.get(0).hasAddedFiles()).isFalse();
+ assertThat(newManifests.get(0).hasDeletedFiles()).isFalse();
- Assert.assertEquals(4, (long) newManifests.get(1).existingFilesCount());
- Assert.assertFalse(newManifests.get(1).hasAddedFiles());
- Assert.assertFalse(newManifests.get(1).hasDeletedFiles());
+ assertThat(newManifests.get(1).existingFilesCount()).isEqualTo(4);
+ assertThat(newManifests.get(1).hasAddedFiles()).isFalse();
+ assertThat(newManifests.get(1).hasDeletedFiles()).isFalse();
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
@@ -358,10 +355,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteImportedManifests() throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
@@ -372,7 +369,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> records =
Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1,
"BBBBBBBBBB", "BBBB"));
- File parquetTableDir = temp.newFolder("parquet_table");
+ File parquetTableDir = temp.resolve("parquet_table").toFile();
String parquetTableLocation = parquetTableDir.toURI().toString();
try {
@@ -386,7 +383,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.partitionBy("c3")
.saveAsTable("parquet_table");
- File stagingDir = temp.newFolder("staging-dir");
+ File stagingDir = temp.resolve("staging-dir").toFile();
SparkTableUtil.importSparkTable(
spark, new TableIdentifier("parquet_table"), table,
stagingDir.toString());
@@ -398,7 +395,8 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
- String rewriteStagingLocation = temp.newFolder().toString();
+ String rewriteStagingLocation =
+ java.nio.file.Files.createTempDirectory(temp, "junit").toString();
RewriteManifests.Result result =
actions
@@ -408,12 +406,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.stagingLocation(rewriteStagingLocation)
.execute();
- Assert.assertEquals(
- "Action should rewrite all manifests",
- snapshot.allManifests(table.io()),
- result.rewrittenManifests());
- Assert.assertEquals(
- "Action should add 1 manifest", 1,
Iterables.size(result.addedManifests()));
+ assertThat(result.rewrittenManifests())
+ .as("Action should rewrite all manifests")
+ .isEqualTo(snapshot.allManifests(table.io()));
+ assertThat(result.addedManifests()).as("Action should add 1
manifest").hasSize(1);
assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
} finally {
@@ -421,7 +417,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
}
}
- @Test
+ @TestTemplate
public void testRewriteLargeManifestsPartitionedTable() throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
@@ -437,7 +433,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.newFastAppend().appendManifest(appendManifest).commit();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 1 manifests before rewrite", 1,
manifests.size());
+ assertThat(manifests).as("Should have 1 manifests before
rewrite").hasSize(1);
// set the target manifest size to a small value to force splitting
records into multiple files
table
@@ -449,7 +445,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
- String stagingLocation = temp.newFolder().toString();
+ String stagingLocation = java.nio.file.Files.createTempDirectory(temp,
"junit").toString();
RewriteManifests.Result result =
actions
@@ -469,7 +465,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
}
- @Test
+ @TestTemplate
public void testRewriteManifestsWithPredicate() throws IOException {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
@@ -493,11 +489,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
table.refresh();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 3 manifests before rewrite", 3,
manifests.size());
+ assertThat(manifests).as("Should have 3 manifests before
rewrite").hasSize(3);
SparkActions actions = SparkActions.get();
- String stagingLocation = temp.newFolder().toString();
+ String stagingLocation = java.nio.file.Files.createTempDirectory(temp,
"junit").toString();
// rewrite only the first manifest
RewriteManifests.Result result =
@@ -511,22 +507,22 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
- Assert.assertEquals(
- "Action should rewrite 2 manifest", 2,
Iterables.size(result.rewrittenManifests()));
- Assert.assertEquals(
- "Action should add 1 manifests", 1,
Iterables.size(result.addedManifests()));
+ assertThat(result.rewrittenManifests()).as("Action should rewrite 2
manifest").hasSize(2);
+ assertThat(result.addedManifests()).as("Action should add 1
manifests").hasSize(1);
assertManifestsLocation(result.addedManifests(), stagingLocation);
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
-
- Assert.assertFalse("First manifest must be rewritten",
newManifests.contains(manifests.get(0)));
- Assert.assertFalse(
- "Second manifest must be rewritten",
newManifests.contains(manifests.get(1)));
- Assert.assertTrue(
- "Third manifest must not be rewritten",
newManifests.contains(manifests.get(2)));
+ assertThat(newManifests)
+ .as("Should have 2 manifests after rewrite")
+ .hasSize(2)
+ .as("First manifest must be rewritten")
+ .doesNotContain(manifests.get(0))
+ .as("Second manifest must be rewritten")
+ .doesNotContain(manifests.get(1))
+ .as("Third manifest must not be rewritten")
+ .contains(manifests.get(2));
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.add(records1.get(0));
@@ -539,10 +535,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteSmallManifestsNonPartitionedV2Table() {
assumeThat(formatVersion).isGreaterThan(1);
@@ -567,7 +563,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
DataFile file2 =
Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io()));
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests before rewrite", 2,
manifests.size());
+ assertThat(manifests).as("Should have 2 manifests before
rewrite").hasSize(2);
SparkActions actions = SparkActions.get();
RewriteManifests.Result result =
@@ -575,21 +571,19 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.rewriteManifests(table)
.option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
.execute();
- Assert.assertEquals(
- "Action should rewrite 2 manifests", 2,
Iterables.size(result.rewrittenManifests()));
- Assert.assertEquals(
- "Action should add 1 manifests", 1,
Iterables.size(result.addedManifests()));
+ assertThat(result.rewrittenManifests()).as("Action should rewrite 2
manifests").hasSize(2);
+ assertThat(result.addedManifests()).as("Action should add 1
manifests").hasSize(1);
assertManifestsLocation(result.addedManifests());
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 1 manifests after rewrite", 1,
newManifests.size());
+ assertThat(newManifests).as("Should have 1 manifests after
rewrite").hasSize(1);
ManifestFile newManifest = Iterables.getOnlyElement(newManifests);
- Assert.assertEquals(2, (long) newManifest.existingFilesCount());
- Assert.assertFalse(newManifest.hasAddedFiles());
- Assert.assertFalse(newManifest.hasDeletedFiles());
+ assertThat(newManifest.existingFilesCount()).isEqualTo(2);
+ assertThat(newManifest.hasAddedFiles()).isFalse();
+ assertThat(newManifest.hasDeletedFiles()).isFalse();
validateDataManifest(
table,
@@ -607,10 +601,10 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
List<ThreeColumnRecord> actualRecords =
resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
- Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ assertThat(actualRecords).as("Rows must match").isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws
IOException {
assumeThat(formatVersion).isEqualTo(1);
@@ -659,9 +653,9 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
}
- @Test
+ @TestTemplate
public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws
IOException {
- assumeThat(formatVersion).isGreaterThan(1);
+ assumeThat(formatVersion).isEqualTo(2);
PartitionSpec spec = PartitionSpec.unpartitioned();
Map<String, String> options = Maps.newHashMap();
@@ -732,9 +726,9 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(actualRecords()).isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteSmallDeleteManifestsPartitionedTable() throws
IOException {
- assumeThat(formatVersion).isGreaterThan(1);
+ assumeThat(formatVersion).isEqualTo(2);
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
@@ -835,9 +829,9 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(actualRecords()).isEqualTo(expectedRecords);
}
- @Test
+ @TestTemplate
public void testRewriteLargeDeleteManifestsPartitionedTable() throws
IOException {
- assumeThat(formatVersion).isGreaterThan(1);
+ assumeThat(formatVersion).isEqualTo(2);
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
Map<String, String> options = Maps.newHashMap();
@@ -874,7 +868,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
SparkActions actions = SparkActions.get();
- String stagingLocation = temp.newFolder().toString();
+ String stagingLocation = java.nio.file.Files.createTempDirectory(temp,
"junit").toString();
RewriteManifests.Result result =
actions
@@ -948,8 +942,8 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
}
private ManifestFile writeManifest(Table table, List<DataFile> files) throws
IOException {
- File manifestFile = temp.newFile("generated-manifest.avro");
- Assert.assertTrue(manifestFile.delete());
+ File manifestFile = File.createTempFile("generated-manifest", ".avro",
temp.toFile());
+ assertThat(manifestFile.delete()).isTrue();
OutputFile outputFile =
table.io().newOutputFile(manifestFile.getCanonicalPath());
ManifestWriter<DataFile> writer =
@@ -1018,7 +1012,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
Table table, StructLike partition, List<Pair<CharSequence, Long>>
deletes)
throws IOException {
- OutputFile outputFile = Files.localOutput(temp.newFile());
+ OutputFile outputFile = Files.localOutput(File.createTempFile("junit",
null, temp.toFile()));
return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes);
}
@@ -1036,7 +1030,7 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
deletes.add(delete.copy(key, value));
}
- OutputFile outputFile = Files.localOutput(temp.newFile());
+ OutputFile outputFile = Files.localOutput(File.createTempFile("junit",
null, temp.toFile()));
return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes,
deleteSchema);
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 022dfa1592..d3508283dd 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -69,10 +68,8 @@ import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.TestBase;
import
org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
@@ -317,10 +314,8 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
// Verifies that the delete methods ran in the threads created by the
provided ExecutorService
// ThreadFactory
assertThat(deleteThreads)
- .isEqualTo(
- Sets.newHashSet(
- "remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3"));
-
+ .containsExactlyInAnyOrder(
+ "remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3");
assertThat(deletedFiles).hasSize(4);
}
@@ -446,10 +441,8 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
- assertThat(result.orphanFileLocations()).as("Should delete 1
file").hasSize(1);
-
assertThat(StreamSupport.stream(result.orphanFileLocations().spliterator(),
false))
- .as("Should remove v1 file")
- .anyMatch(file -> file.contains("v1.metadata.json"));
+ assertThat(result.orphanFileLocations())
+ .containsExactly(tableLocation + "metadata/v1.metadata.json");
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -748,9 +741,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.deleteOrphanFiles(table)
.olderThan(System.currentTimeMillis() + 1000)
.execute();
-
assertThat(StreamSupport.stream(result.orphanFileLocations().spliterator(),
false))
+ assertThat(result.orphanFileLocations())
.as("trash file should be removed")
- .anyMatch(file -> file.contains("file:" + location +
"/data/trashfile"));
+ .contains("file:" + location + "/data/trashfile");
}
@TestTemplate
@@ -967,11 +960,8 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.olderThan(System.currentTimeMillis() + 1000)
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
- assertThat(orphanFileLocations).as("Should be orphan file").hasSize(1);
- assertThat(Iterables.getOnlyElement(orphanFileLocations))
- .as("Deleted file")
- .isEqualTo(statsLocation.toURI().toString());
- assertThat(statsLocation.exists()).as("stats file should be
deleted").isFalse();
+
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+ assertThat(statsLocation).as("stats file should be
deleted").doesNotExist();
}
@TestTemplate
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 35d86b0a44..5f98287951 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -57,9 +57,9 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
-
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
+ assertThat(results.orphanFileLocations())
.as("trash file should be removed")
- .anyMatch(file -> file.contains("file:" + location + trashFile));
+ .contains("file:" + location + trashFile);
}
@TestTemplate
@@ -86,9 +86,9 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
-
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
+ assertThat(results.orphanFileLocations())
.as("trash file should be removed")
- .anyMatch(file -> file.contains("file:" + location + trashFile));
+ .contains("file:" + location + trashFile);
}
@TestTemplate
@@ -148,9 +148,9 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
-
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
+ assertThat(results.orphanFileLocations())
.as("trash file should be removed")
- .anyMatch(file -> file.contains("file:" + location + trashFile));
+ .contains("file:" + location + trashFile);
}
@TestTemplate
@@ -169,7 +169,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms,
properties);
SparkTable table = (SparkTable) cat.loadTable(id);
- spark.sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
+ sql("INSERT INTO default.sessioncattest VALUES (1,1,1)");
String location = table.table().location().replaceFirst("file:", "");
String trashFile = randomName("/data/trashfile");
@@ -180,13 +180,13 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
-
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
+ assertThat(results.orphanFileLocations())
.as("trash file should be removed")
- .anyMatch(file -> file.contains("file:" + location + trashFile));
+ .contains("file:" + location + trashFile);
}
@AfterEach
- public void resetSparkSessionCatalog() throws Exception {
+ public void resetSparkSessionCatalog() {
spark.conf().unset("spark.sql.catalog.spark_catalog");
spark.conf().unset("spark.sql.catalog.spark_catalog.type");
spark.conf().unset("spark.sql.catalog.spark_catalog.warehouse");
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bdbb8c1768..b3af2b2b21 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -658,7 +658,7 @@ public class TestRewriteDataFilesAction extends TestBase {
shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 ==
1", 3);
shouldHaveSnapshots(table, 5);
-
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+
assertThat(table.currentSnapshot().summary()).containsEntry("total-position-deletes",
"0");
assertEquals("Rows must match", expectedRecords, currentData());
}
@@ -1894,7 +1894,7 @@ public class TestRewriteDataFilesAction extends TestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
@@ -1917,7 +1917,7 @@ public class TestRewriteDataFilesAction extends TestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
@@ -1956,7 +1956,7 @@ public class TestRewriteDataFilesAction extends TestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
@@ -1979,7 +1979,7 @@ public class TestRewriteDataFilesAction extends TestBase {
.execute();
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
- assertThat(currentData().size()).isEqualTo(count);
+ assertThat(currentData()).hasSize((int) count);
shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId);
}
@@ -2049,10 +2049,9 @@ public class TestRewriteDataFilesAction extends TestBase
{
protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
table.refresh();
- int actualSnapshots = Iterables.size(table.snapshots());
- assertThat(actualSnapshots)
+ assertThat(table.snapshots())
.as("Table did not have the expected number of snapshots")
- .isEqualTo(expectedSnapshots);
+ .hasSize(expectedSnapshots);
}
protected void shouldHaveNoOrphans(Table table) {
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4497184354..76b201aa56 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -400,7 +400,6 @@ public class TestRewriteManifestsAction extends TestBase {
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
-
assertThat(newManifests).as("Should have 2 manifests after
rewrite").hasSize(2);
assertThat(newManifests.get(0).existingFilesCount()).isEqualTo(4);