Copilot commented on code in PR #15154:
URL: https://github.com/apache/iceberg/pull/15154#discussion_r2732849260


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -174,14 +182,71 @@ public Dataset<FileInfo> expireFiles() {
 
       // fetch valid files after expiration
       TableMetadata updatedMetadata = ops.refresh();
-      Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
 
-      // fetch files referenced by expired snapshots
+      // find IDs of expired snapshots
       Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, 
updatedMetadata);
-      Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, 
deletedSnapshotIds);
+      if (deletedSnapshotIds.isEmpty()) {
+        this.expiredFileDS = emptyFileInfoDS();
+        return expiredFileDS;
+      }
+
+      Table originalTable = newStaticTable(originalMetadata, table.io());
+      Table updatedTable = newStaticTable(updatedMetadata, table.io());
+
+      Dataset<Row> expiredManifestDF =
+          loadMetadataTable(originalTable, MetadataTableType.ALL_MANIFESTS)
+              .filter(
+                  
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(deletedSnapshotIds));
+
+      Dataset<Row> liveManifestDF =
+          loadMetadataTable(updatedTable, MetadataTableType.ALL_MANIFESTS);
+
+      Dataset<String> expiredManifestPaths =
+          
expiredManifestDF.select(col("path")).distinct().as(Encoders.STRING());
+
+      Dataset<String> liveManifestPaths =
+          liveManifestDF.select(col("path")).distinct().as(Encoders.STRING());
+
+      Dataset<String> orphanedManifestPaths = 
expiredManifestPaths.except(liveManifestPaths);
+
+      Dataset<FileInfo> expiredManifestLists = manifestListDS(originalTable, 
deletedSnapshotIds);
+      Dataset<FileInfo> liveManifestLists = manifestListDS(updatedTable, null);
+      Dataset<FileInfo> orphanedManifestLists = 
expiredManifestLists.except(liveManifestLists);
+
+      Dataset<FileInfo> expiredStats = statisticsFileDS(originalTable, 
deletedSnapshotIds);
+      Dataset<FileInfo> liveStats = statisticsFileDS(updatedTable, null);
+      Dataset<FileInfo> orphanedStats = expiredStats.except(liveStats);
+
+      if (orphanedManifestPaths.isEmpty()) {

Review Comment:
   Using `isEmpty()` on a Dataset triggers a Spark action that collects data to 
the driver. Consider using `first()` wrapped in a try-catch or `take(1).length 
== 0` to avoid potentially expensive operations when checking if a dataset is 
empty.
   ```suggestion
         boolean hasOrphanedManifestPaths = 
orphanedManifestPaths.limit(1).toLocalIterator().hasNext();
         if (!hasOrphanedManifestPaths) {
   ```



##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java:
##########
@@ -1203,7 +1203,7 @@ public void testUseLocalIterator() {
           assertThat(jobsRunDuringStreamResults)
               .as(
                   "Expected total number of jobs with stream-results should 
match the expected number")
-              .isEqualTo(4L);
+              .isEqualTo(12L);

Review Comment:
   The expected job count increased from 4 to 12 due to the new distributed 
operations. Consider adding a comment explaining why this specific count is 
expected, or add a test case that validates the optimization logic (e.g., 
verifying early exits when no orphaned manifests exist).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to