This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 707be1a6a6 Spark: backport `#14886` to other Spark versions (#15178)
707be1a6a6 is described below
commit 707be1a6a69d555e6601ba5b3a0acca86cc225e2
Author: Alessandro Nori <[email protected]>
AuthorDate: Thu Jan 29 17:56:47 2026 +0100
Spark: backport `#14886` to other Spark versions (#15178)
---
.../actions/DeleteOrphanFilesSparkAction.java | 5 ++-
.../spark/actions/TestRemoveOrphanFilesAction.java | 45 ++++++++++++++++++++++
.../actions/TestRemoveOrphanFilesAction3.java | 11 +++++-
.../actions/DeleteOrphanFilesSparkAction.java | 5 ++-
.../spark/actions/TestRemoveOrphanFilesAction.java | 45 ++++++++++++++++++++++
.../actions/TestRemoveOrphanFilesAction3.java | 5 +++
.../actions/DeleteOrphanFilesSparkAction.java | 5 ++-
.../spark/actions/TestRemoveOrphanFilesAction.java | 45 ++++++++++++++++++++++
.../actions/TestRemoveOrphanFilesAction3.java | 5 +++
9 files changed, 166 insertions(+), 5 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 78662159b0..92bfc880ad 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -300,7 +300,10 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
LOG.info("Deleted {} orphan files", filesCount);
- return
ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build();
+ return ImmutableDeleteOrphanFiles.Result.builder()
+ .orphanFileLocations(orphanFileList)
+ .orphanFilesCount(filesCount)
+ .build();
}
private void collectPathsForOutput(
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 40505b8567..0d2a5c0a4d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -184,6 +184,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -195,6 +198,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -210,6 +216,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Streaming dry run should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result3.orphanFilesCount())
+ .as("Streaming dry run should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present after streaming dry run")
.isTrue();
@@ -223,6 +232,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result4.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result4.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -286,6 +298,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 4
files").hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs =
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
@@ -366,6 +379,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.containsExactlyInAnyOrder(
"remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3");
assertThat(deletedFiles).hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
}
@TestTemplate
@@ -410,6 +424,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
}
@TestTemplate
@@ -440,6 +455,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
@@ -478,6 +494,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete only 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -509,6 +526,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.containsExactly(tableLocation + "metadata/v1.metadata.json");
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -545,6 +563,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Rows count must
match").isEqualTo(records.size());
@@ -575,6 +594,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Row count must
match").isEqualTo(records.size());
@@ -615,6 +635,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -655,6 +676,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -694,6 +716,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 0
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should delete 0
files").isEqualTo(0L);
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
}
@@ -766,6 +789,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -803,6 +829,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete only 1
file").isEqualTo(1L);
Dataset<Row> resultDF =
spark.read().format("iceberg").load(table.location());
List<ThreeColumnRecord> actualRecords =
@@ -838,6 +865,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + "/data/trashfile");
+ assertThat(result.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -932,6 +960,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -943,6 +974,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -956,6 +990,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result3.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -985,6 +1022,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find
nothing").isEmpty();
+ assertThat(result4.orphanFilesCount()).as("Action should find
nothing").isEqualTo(0L);
}
protected long waitUntilAfter(long timestampMillis) {
@@ -1064,6 +1102,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
assertThat(statsLocation).as("stats file should be
deleted").doesNotExist();
}
@@ -1282,6 +1321,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.as("Non-streaming dry-run should return all 10 orphan files")
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(invalidFiles);
+ assertThat(nonStreamingResult.orphanFilesCount())
+ .as("Non-streaming dry-run should return all 10 orphan files")
+ .isEqualTo((long) invalidFiles.size());
DeleteOrphanFiles.Result streamingResult =
SparkActions.get()
@@ -1295,6 +1337,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(streamingResult.orphanFileLocations())
.as("Streaming with sample size 5 should return only 5 orphan files")
.hasSize(5);
+ assertThat(streamingResult.orphanFilesCount())
+ .as("Deleted 10 files")
+ .isEqualTo((long) invalidFiles.size());
for (String invalidFile : invalidFiles) {
assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be
deleted").isFalse();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 646e5f8e70..88ac800b15 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
+import java.util.stream.StreamSupport;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -59,6 +60,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -88,6 +90,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -114,9 +117,11 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
.deleteOrphanFiles(table.table())
.olderThan(System.currentTimeMillis() + 1000)
.execute();
- assertThat(results.orphanFileLocations())
+
+
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
.as("trash file should be removed")
- .contains("file:" + location + trashFile);
+ .anyMatch(file -> file.contains("file:" + location + trashFile));
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -149,6 +154,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -181,6 +187,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@AfterEach
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 78662159b0..92bfc880ad 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -300,7 +300,10 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
LOG.info("Deleted {} orphan files", filesCount);
- return
ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build();
+ return ImmutableDeleteOrphanFiles.Result.builder()
+ .orphanFileLocations(orphanFileList)
+ .orphanFilesCount(filesCount)
+ .build();
}
private void collectPathsForOutput(
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 40505b8567..0d2a5c0a4d 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -184,6 +184,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -195,6 +198,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -210,6 +216,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Streaming dry run should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result3.orphanFilesCount())
+ .as("Streaming dry run should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present after streaming dry run")
.isTrue();
@@ -223,6 +232,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result4.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result4.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -286,6 +298,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 4
files").hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs =
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
@@ -366,6 +379,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.containsExactlyInAnyOrder(
"remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3");
assertThat(deletedFiles).hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
}
@TestTemplate
@@ -410,6 +424,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
}
@TestTemplate
@@ -440,6 +455,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
@@ -478,6 +494,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete only 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -509,6 +526,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.containsExactly(tableLocation + "metadata/v1.metadata.json");
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -545,6 +563,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Rows count must
match").isEqualTo(records.size());
@@ -575,6 +594,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Row count must
match").isEqualTo(records.size());
@@ -615,6 +635,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -655,6 +676,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -694,6 +716,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 0
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should delete 0
files").isEqualTo(0L);
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
}
@@ -766,6 +789,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -803,6 +829,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete only 1
file").isEqualTo(1L);
Dataset<Row> resultDF =
spark.read().format("iceberg").load(table.location());
List<ThreeColumnRecord> actualRecords =
@@ -838,6 +865,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + "/data/trashfile");
+ assertThat(result.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -932,6 +960,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -943,6 +974,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -956,6 +990,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result3.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -985,6 +1022,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find
nothing").isEmpty();
+ assertThat(result4.orphanFilesCount()).as("Action should find
nothing").isEqualTo(0L);
}
protected long waitUntilAfter(long timestampMillis) {
@@ -1064,6 +1102,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
assertThat(statsLocation).as("stats file should be
deleted").doesNotExist();
}
@@ -1282,6 +1321,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.as("Non-streaming dry-run should return all 10 orphan files")
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(invalidFiles);
+ assertThat(nonStreamingResult.orphanFilesCount())
+ .as("Non-streaming dry-run should return all 10 orphan files")
+ .isEqualTo((long) invalidFiles.size());
DeleteOrphanFiles.Result streamingResult =
SparkActions.get()
@@ -1295,6 +1337,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(streamingResult.orphanFileLocations())
.as("Streaming with sample size 5 should return only 5 orphan files")
.hasSize(5);
+ assertThat(streamingResult.orphanFilesCount())
+ .as("Deleted 10 files")
+ .isEqualTo((long) invalidFiles.size());
for (String invalidFile : invalidFiles) {
assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be
deleted").isFalse();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 5f98287951..88ac800b15 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -60,6 +60,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -89,6 +90,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -119,6 +121,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
.as("trash file should be removed")
.anyMatch(file -> file.contains("file:" + location + trashFile));
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -151,6 +154,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -183,6 +187,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@AfterEach
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
index 78662159b0..92bfc880ad 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java
@@ -300,7 +300,10 @@ public class DeleteOrphanFilesSparkAction extends
BaseSparkAction<DeleteOrphanFi
LOG.info("Deleted {} orphan files", filesCount);
- return
ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFileList).build();
+ return ImmutableDeleteOrphanFiles.Result.builder()
+ .orphanFileLocations(orphanFileList)
+ .orphanFilesCount(filesCount)
+ .build();
}
private void collectPathsForOutput(
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 40505b8567..0d2a5c0a4d 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -184,6 +184,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -195,6 +198,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -210,6 +216,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Streaming dry run should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result3.orphanFilesCount())
+ .as("Streaming dry run should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present after streaming dry run")
.isTrue();
@@ -223,6 +232,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result4.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result4.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -286,6 +298,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 4
files").hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
Path dataPath = new Path(tableLocation + "/data");
FileSystem fs =
dataPath.getFileSystem(spark.sessionState().newHadoopConf());
@@ -366,6 +379,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.containsExactlyInAnyOrder(
"remove-orphan-0", "remove-orphan-1", "remove-orphan-2",
"remove-orphan-3");
assertThat(deletedFiles).hasSize(4);
+ assertThat(result.orphanFilesCount()).as("Should delete 4
files").isEqualTo(4L);
}
@TestTemplate
@@ -410,6 +424,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
}
@TestTemplate
@@ -440,6 +455,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
@@ -478,6 +494,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete only 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -509,6 +526,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.containsExactly(tableLocation + "metadata/v1.metadata.json");
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
@@ -545,6 +563,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Rows count must
match").isEqualTo(records.size());
@@ -575,6 +594,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should not delete any
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should not delete any
files").isEqualTo(0L);
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
assertThat(resultDF.count()).as("Row count must
match").isEqualTo(records.size());
@@ -615,6 +635,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -655,6 +676,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 2
files").hasSize(2);
+ assertThat(result.orphanFilesCount()).as("Should delete 2
files").isEqualTo(2L);
}
@TestTemplate
@@ -694,6 +716,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete 0
files").isEmpty();
+ assertThat(result.orphanFilesCount()).as("Should delete 0
files").isEqualTo(0L);
assertThat(fs.exists(pathToFileInHiddenFolder)).isTrue();
}
@@ -766,6 +789,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFiles);
+ assertThat(result.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFiles.size());
assertThat(fs.exists(new Path(invalidFiles.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -803,6 +829,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
assertThat(result.orphanFileLocations()).as("Should delete only 1
file").hasSize(1);
+ assertThat(result.orphanFilesCount()).as("Should delete only 1
file").isEqualTo(1L);
Dataset<Row> resultDF =
spark.read().format("iceberg").load(table.location());
List<ThreeColumnRecord> actualRecords =
@@ -838,6 +865,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + "/data/trashfile");
+ assertThat(result.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -932,6 +960,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
+ assertThat(result1.orphanFilesCount())
+ .as("Should not find any orphan file using default olderThan interval")
+ .isEqualTo(0L);
DeleteOrphanFiles.Result result2 =
actions
@@ -943,6 +974,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result2.orphanFileLocations())
.as("Action should find 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result2.orphanFilesCount())
+ .as("Action should find 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should be present")
.isTrue();
@@ -956,6 +990,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
+ assertThat(result3.orphanFilesCount())
+ .as("Action should delete 1 file")
+ .isEqualTo((long) invalidFilePaths.size());
assertThat(fs.exists(new Path(invalidFilePaths.get(0))))
.as("Invalid file should not be present")
.isFalse();
@@ -985,6 +1022,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.deleteWith(s -> {})
.execute();
assertThat(result4.orphanFileLocations()).as("Action should find
nothing").isEmpty();
+ assertThat(result4.orphanFilesCount()).as("Action should find
nothing").isEqualTo(0L);
}
protected long waitUntilAfter(long timestampMillis) {
@@ -1064,6 +1102,7 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.execute();
Iterable<String> orphanFileLocations = result.orphanFileLocations();
assertThat(orphanFileLocations).hasSize(1).containsExactly(statsLocation.toURI().toString());
+ assertThat(result.orphanFilesCount()).as("Should delete 1
file").isEqualTo(1L);
assertThat(statsLocation).as("stats file should be
deleted").doesNotExist();
}
@@ -1282,6 +1321,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
.as("Non-streaming dry-run should return all 10 orphan files")
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(invalidFiles);
+ assertThat(nonStreamingResult.orphanFilesCount())
+ .as("Non-streaming dry-run should return all 10 orphan files")
+ .isEqualTo((long) invalidFiles.size());
DeleteOrphanFiles.Result streamingResult =
SparkActions.get()
@@ -1295,6 +1337,9 @@ public abstract class TestRemoveOrphanFilesAction extends
TestBase {
assertThat(streamingResult.orphanFileLocations())
.as("Streaming with sample size 5 should return only 5 orphan files")
.hasSize(5);
+ assertThat(streamingResult.orphanFilesCount())
+ .as("Deleted 10 files")
+ .isEqualTo((long) invalidFiles.size());
for (String invalidFile : invalidFiles) {
assertThat(fs.exists(new Path(invalidFile))).as("Orphan file should be
deleted").isFalse();
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index 5f98287951..88ac800b15 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -60,6 +60,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -89,6 +90,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -119,6 +121,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(StreamSupport.stream(results.orphanFileLocations().spliterator(),
false))
.as("trash file should be removed")
.anyMatch(file -> file.contains("file:" + location + trashFile));
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -151,6 +154,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@TestTemplate
@@ -183,6 +187,7 @@ public class TestRemoveOrphanFilesAction3 extends
TestRemoveOrphanFilesAction {
assertThat(results.orphanFileLocations())
.as("trash file should be removed")
.contains("file:" + location + trashFile);
+ assertThat(results.orphanFilesCount()).as("trash file should be
removed").isEqualTo(1L);
}
@AfterEach