prodeezy commented on a change in pull request #928: Add failure handling in 
cleanup while reading snapshot manifest-list files
URL: https://github.com/apache/incubator-iceberg/pull/928#discussion_r409704944
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
 ##########
 @@ -201,112 +201,121 @@ private void cleanExpiredFiles(List<Snapshot> 
snapshots, Set<Long> validIds, Set
     // find manifests to clean up that are still referenced by a valid 
snapshot, but written by an expired snapshot
     Set<String> validManifests = Sets.newHashSet();
     Set<ManifestFile> manifestsToScan = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
-        for (ManifestFile manifest : manifests) {
-          validManifests.add(manifest.path());
-
-          long snapshotId = manifest.snapshotId();
-          // whether the manifest was created by a valid snapshot (true) or an 
expired snapshot (false)
-          boolean fromValidSnapshots = validIds.contains(snapshotId);
-          // whether the snapshot that created the manifest was an ancestor of 
the table state
-          boolean isFromAncestor = ancestorIds.contains(snapshotId);
-          // whether the changes in this snapshot have been picked into the 
current table state
-          boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
-          // if the snapshot that wrote this manifest is no longer valid (has 
expired), then delete its deleted files.
-          // note that this is only for expired snapshots that are in the 
current table state
-          if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
-            manifestsToScan.add(manifest.copy());
-          }
-        }
-
-      } catch (IOException e) {
-        throw new RuntimeIOException(e,
-            "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
-      }
-    }
+    Tasks.foreach(snapshots).noRetry().suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) -> LOG.warn("Failed on snapshot {} while 
reading manifest list: {}",
+            snapshot.snapshotId(), snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest.path());
+
+                  long snapshotId = manifest.snapshotId();
+                  // whether the manifest was created by a valid snapshot 
(true) or an expired snapshot (false)
+                  boolean fromValidSnapshots = validIds.contains(snapshotId);
+                  // whether the snapshot that created the manifest was an 
ancestor of the table state
+                  boolean isFromAncestor = ancestorIds.contains(snapshotId);
+                  // whether the changes in this snapshot have been picked 
into the current table state
+                  boolean isPicked = 
pickedAncestorSnapshotIds.contains(snapshotId);
+                  // if the snapshot that wrote this manifest is no longer 
valid (has expired),
+                  // then delete its deleted files. note that this is only for 
expired snapshots that are in the
+                  // current table state
+                  if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
+                    manifestsToScan.add(manifest.copy());
+                  }
+                }
+
+              } catch (IOException e) {
+                throw new RuntimeIOException(e,
+                    "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
+              }
+            });
 
     // find manifests to clean up that were only referenced by snapshots that 
have expired
     Set<String> manifestListsToDelete = Sets.newHashSet();
     Set<String> manifestsToDelete = Sets.newHashSet();
     Set<ManifestFile> manifestsToRevert = Sets.newHashSet();
-    for (Snapshot snapshot : base.snapshots()) {
-      long snapshotId = snapshot.snapshotId();
-      if (!validIds.contains(snapshotId)) {
-        // determine whether the changes in this snapshot are in the current 
table state
-        if (pickedAncestorSnapshotIds.contains(snapshotId)) {
-          // this snapshot was cherry-picked into the current table state, so 
skip cleaning it up. its changes will
-          // expire when the picked snapshot expires.
-          // A -- C -- D (source=B)
-          //  `- B <-- this commit
-          continue;
-        }
-
-        long sourceSnapshotId = PropertyUtil.propertyAsLong(
-            snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
-        if (ancestorIds.contains(sourceSnapshotId)) {
-          // this commit was cherry-picked from a commit that is in the 
current table state. do not clean up its
-          // changes because it would revert data file additions that are in 
the current table.
-          // A -- B -- C
-          //  `- D (source=B) <-- this commit
-          continue;
-        }
-
-        if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
-          // this commit was cherry-picked from a commit that is in the 
current table state. do not clean up its
-          // changes because it would revert data file additions that are in 
the current table.
-          // A -- C -- E (source=B)
-          //  `- B `- D (source=B) <-- this commit
-          continue;
-        }
-
-        // find any manifests that are no longer needed
-        try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
-          for (ManifestFile manifest : manifests) {
-            if (!validManifests.contains(manifest.path())) {
-              manifestsToDelete.add(manifest.path());
-
-              boolean isFromAncestor = 
ancestorIds.contains(manifest.snapshotId());
-              boolean isFromExpiringSnapshot = 
expiredIds.contains(manifest.snapshotId());
-
-              if (isFromAncestor && manifest.hasDeletedFiles()) {
-                // Only delete data files that were deleted in by an expired 
snapshot if that
-                // snapshot is an ancestor of the current table state. 
Otherwise, a snapshot that
-                // deleted files and was rolled back will delete files that 
could be in the current
-                // table state.
-                manifestsToScan.add(manifest.copy());
+    Tasks.foreach(base.snapshots()).noRetry().suppressFailureWhenFinished()
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to