Copilot commented on code in PR #15154:
URL: https://github.com/apache/iceberg/pull/15154#discussion_r2732849260
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -174,14 +182,71 @@ public Dataset<FileInfo> expireFiles() {
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
- Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
- // fetch files referenced by expired snapshots
+ // find IDs of expired snapshots
Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata,
updatedMetadata);
- Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata,
deletedSnapshotIds);
+ if (deletedSnapshotIds.isEmpty()) {
+ this.expiredFileDS = emptyFileInfoDS();
+ return expiredFileDS;
+ }
+
+ Table originalTable = newStaticTable(originalMetadata, table.io());
+ Table updatedTable = newStaticTable(updatedMetadata, table.io());
+
+ Dataset<Row> expiredManifestDF =
+ loadMetadataTable(originalTable, MetadataTableType.ALL_MANIFESTS)
+ .filter(
+
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(deletedSnapshotIds));
+
+ Dataset<Row> liveManifestDF =
+ loadMetadataTable(updatedTable, MetadataTableType.ALL_MANIFESTS);
+
+ Dataset<String> expiredManifestPaths =
+
expiredManifestDF.select(col("path")).distinct().as(Encoders.STRING());
+
+ Dataset<String> liveManifestPaths =
+ liveManifestDF.select(col("path")).distinct().as(Encoders.STRING());
+
+ Dataset<String> orphanedManifestPaths =
expiredManifestPaths.except(liveManifestPaths);
+
+ Dataset<FileInfo> expiredManifestLists = manifestListDS(originalTable,
deletedSnapshotIds);
+ Dataset<FileInfo> liveManifestLists = manifestListDS(updatedTable, null);
+ Dataset<FileInfo> orphanedManifestLists =
expiredManifestLists.except(liveManifestLists);
+
+ Dataset<FileInfo> expiredStats = statisticsFileDS(originalTable,
deletedSnapshotIds);
+ Dataset<FileInfo> liveStats = statisticsFileDS(updatedTable, null);
+ Dataset<FileInfo> orphanedStats = expiredStats.except(liveStats);
+
+ if (orphanedManifestPaths.isEmpty()) {
Review Comment:
Using `isEmpty()` on a Dataset triggers a Spark action that collects data to
the driver. Consider using `first()` wrapped in a try-catch or `take(1).length
== 0` to avoid potentially expensive operations when checking if a dataset is
empty.
```suggestion
boolean hasOrphanedManifestPaths =
orphanedManifestPaths.limit(1).toLocalIterator().hasNext();
if (!hasOrphanedManifestPaths) {
```
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java:
##########
@@ -1203,7 +1203,7 @@ public void testUseLocalIterator() {
assertThat(jobsRunDuringStreamResults)
.as(
"Expected total number of jobs with stream-results should
match the expected number")
- .isEqualTo(4L);
+ .isEqualTo(12L);
Review Comment:
The expected job count increased from 4 to 12 due to the new distributed
operations. Consider adding a comment explaining why this specific count is
expected, or add a test case that validates the optimization logic (e.g.,
verifying early exits when no orphaned manifests exist).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]