rdblue commented on a change in pull request #928: Add failure handling in 
cleanup while reading snapshot manifest-list files
URL: https://github.com/apache/incubator-iceberg/pull/928#discussion_r409188653
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
 ##########
 @@ -201,112 +201,121 @@ private void cleanExpiredFiles(List<Snapshot> 
snapshots, Set<Long> validIds, Set
     // find manifests to clean up that are still referenced by a valid 
snapshot, but written by an expired snapshot
     Set<String> validManifests = Sets.newHashSet();
     Set<ManifestFile> manifestsToScan = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
-        for (ManifestFile manifest : manifests) {
-          validManifests.add(manifest.path());
-
-          long snapshotId = manifest.snapshotId();
-          // whether the manifest was created by a valid snapshot (true) or an 
expired snapshot (false)
-          boolean fromValidSnapshots = validIds.contains(snapshotId);
-          // whether the snapshot that created the manifest was an ancestor of 
the table state
-          boolean isFromAncestor = ancestorIds.contains(snapshotId);
-          // whether the changes in this snapshot have been picked into the 
current table state
-          boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
-          // if the snapshot that wrote this manifest is no longer valid (has 
expired), then delete its deleted files.
-          // note that this is only for expired snapshots that are in the 
current table state
-          if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
-            manifestsToScan.add(manifest.copy());
-          }
-        }
-
-      } catch (IOException e) {
-        throw new RuntimeIOException(e,
-            "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
-      }
-    }
+    Tasks.foreach(snapshots).noRetry().suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) -> LOG.warn("Failed on snapshot {} while 
reading manifest list: {}",
+            snapshot.snapshotId(), snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
+                for (ManifestFile manifest : manifests) {
+                  validManifests.add(manifest.path());
+
+                  long snapshotId = manifest.snapshotId();
+                  // whether the manifest was created by a valid snapshot 
(true) or an expired snapshot (false)
+                  boolean fromValidSnapshots = validIds.contains(snapshotId);
+                  // whether the snapshot that created the manifest was an 
ancestor of the table state
+                  boolean isFromAncestor = ancestorIds.contains(snapshotId);
+                  // whether the changes in this snapshot have been picked 
into the current table state
+                  boolean isPicked = 
pickedAncestorSnapshotIds.contains(snapshotId);
+                  // if the snapshot that wrote this manifest is no longer 
valid (has expired),
+                  // then delete its deleted files. note that this is only for 
expired snapshots that are in the
+                  // current table state
+                  if (!fromValidSnapshots && (isFromAncestor || isPicked) && 
manifest.hasDeletedFiles()) {
+                    manifestsToScan.add(manifest.copy());
+                  }
+                }
+
+              } catch (IOException e) {
+                throw new RuntimeIOException(e,
+                    "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
+              }
+            });
 
     // find manifests to clean up that were only referenced by snapshots that 
have expired
     Set<String> manifestListsToDelete = Sets.newHashSet();
     Set<String> manifestsToDelete = Sets.newHashSet();
     Set<ManifestFile> manifestsToRevert = Sets.newHashSet();
-    for (Snapshot snapshot : base.snapshots()) {
-      long snapshotId = snapshot.snapshotId();
-      if (!validIds.contains(snapshotId)) {
-        // determine whether the changes in this snapshot are in the current 
table state
-        if (pickedAncestorSnapshotIds.contains(snapshotId)) {
-          // this snapshot was cherry-picked into the current table state, so 
skip cleaning it up. its changes will
-          // expire when the picked snapshot expires.
-          // A -- C -- D (source=B)
-          //  `- B <-- this commit
-          continue;
-        }
-
-        long sourceSnapshotId = PropertyUtil.propertyAsLong(
-            snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
-        if (ancestorIds.contains(sourceSnapshotId)) {
-          // this commit was cherry-picked from a commit that is in the 
current table state. do not clean up its
-          // changes because it would revert data file additions that are in 
the current table.
-          // A -- B -- C
-          //  `- D (source=B) <-- this commit
-          continue;
-        }
-
-        if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
-          // this commit was cherry-picked from a commit that is in the 
current table state. do not clean up its
-          // changes because it would revert data file additions that are in 
the current table.
-          // A -- C -- E (source=B)
-          //  `- B `- D (source=B) <-- this commit
-          continue;
-        }
-
-        // find any manifests that are no longer needed
-        try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
-          for (ManifestFile manifest : manifests) {
-            if (!validManifests.contains(manifest.path())) {
-              manifestsToDelete.add(manifest.path());
-
-              boolean isFromAncestor = 
ancestorIds.contains(manifest.snapshotId());
-              boolean isFromExpiringSnapshot = 
expiredIds.contains(manifest.snapshotId());
-
-              if (isFromAncestor && manifest.hasDeletedFiles()) {
-                // Only delete data files that were deleted in by an expired 
snapshot if that
-                // snapshot is an ancestor of the current table state. 
Otherwise, a snapshot that
-                // deleted files and was rolled back will delete files that 
could be in the current
-                // table state.
-                manifestsToScan.add(manifest.copy());
+    Tasks.foreach(base.snapshots()).noRetry().suppressFailureWhenFinished()
+        .onFailure((snapshot, exc) -> LOG.warn("Failed on snapshot {} while 
reading manifest list: {}",
+            snapshot.snapshotId(), snapshot.manifestListLocation(), exc))
+        .run(
+            snapshot -> {
+              long snapshotId = snapshot.snapshotId();
+              if (!validIds.contains(snapshotId)) {
+                // determine whether the changes in this snapshot are in the 
current table state
+                if (pickedAncestorSnapshotIds.contains(snapshotId)) {
+                  // this snapshot was cherry-picked into the current table 
state, so skip cleaning it up.
+                  // its changes will expire when the picked snapshot expires.
+                  // A -- C -- D (source=B)
+                  //  `- B <-- this commit
+                  return;
+                }
+
+                long sourceSnapshotId = PropertyUtil.propertyAsLong(
+                    snapshot.summary(), 
SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
+                if (ancestorIds.contains(sourceSnapshotId)) {
+                  // this commit was cherry-picked from a commit that is in 
the current table state. do not clean up its
+                  // changes because it would revert data file additions that 
are in the current table.
+                  // A -- B -- C
+                  //  `- D (source=B) <-- this commit
+                  return;
+                }
+
+                if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
+                  // this commit was cherry-picked from a commit that is in 
the current table state. do not clean up its
+                  // changes because it would revert data file additions that 
are in the current table.
+                  // A -- C -- E (source=B)
+                  //  `- B `- D (source=B) <-- this commit
+                  return;
+                }
+
+                // find any manifests that are no longer needed
+                try (CloseableIterable<ManifestFile> manifests = 
readManifestFiles(snapshot)) {
+                  for (ManifestFile manifest : manifests) {
+                    if (!validManifests.contains(manifest.path())) {
+                      manifestsToDelete.add(manifest.path());
+
+                      boolean isFromAncestor = 
ancestorIds.contains(manifest.snapshotId());
+                      boolean isFromExpiringSnapshot = 
expiredIds.contains(manifest.snapshotId());
+
+                      if (isFromAncestor && manifest.hasDeletedFiles()) {
+                        // Only delete data files that were deleted in by an 
expired snapshot if that
+                        // snapshot is an ancestor of the current table state. 
Otherwise, a snapshot that
+                        // deleted files and was rolled back will delete files 
that could be in the current
+                        // table state.
+                        manifestsToScan.add(manifest.copy());
+                      }
+
+                      if (!isFromAncestor && isFromExpiringSnapshot && 
manifest.hasAddedFiles()) {
+                        // Because the manifest was written by a snapshot that 
is not an ancestor of the
+                        // current table state, the files added in this 
manifest can be removed. The extra
+                        // check whether the manifest was written by a known 
snapshot that was expired in
+                        // this commit ensures that the full ancestor list 
between when the snapshot was
+                        // written and this expiration is known and there is 
no missing history. If history
+                        // were missing, then the snapshot could be an 
ancestor of the table state but the
+                        // ancestor ID set would not contain it and this would 
be unsafe.
+                        manifestsToRevert.add(manifest.copy());
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  throw new RuntimeIOException(e,
+                      "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
+                }
+
+                // add the manifest list to the delete set, if present
+                if (snapshot.manifestListLocation() != null) {
+                  manifestListsToDelete.add(snapshot.manifestListLocation());
+                }
               }
-
-              if (!isFromAncestor && isFromExpiringSnapshot && 
manifest.hasAddedFiles()) {
-                // Because the manifest was written by a snapshot that is not 
an ancestor of the
-                // current table state, the files added in this manifest can 
be removed. The extra
-                // check whether the manifest was written by a known snapshot 
that was expired in
-                // this commit ensures that the full ancestor list between 
when the snapshot was
-                // written and this expiration is known and there is no 
missing history. If history
-                // were missing, then the snapshot could be an ancestor of the 
table state but the
-                // ancestor ID set would not contain it and this would be 
unsafe.
-                manifestsToRevert.add(manifest.copy());
-              }
-            }
-          }
-        } catch (IOException e) {
-          throw new RuntimeIOException(e,
-              "Failed to close manifest list: %s", 
snapshot.manifestListLocation());
-        }
-
-        // add the manifest list to the delete set, if present
-        if (snapshot.manifestListLocation() != null) {
-          manifestListsToDelete.add(snapshot.manifestListLocation());
-        }
-      }
-    }
-
+            });
     deleteDataFiles(manifestsToScan, manifestsToRevert, validIds);
     deleteMetadataFiles(manifestsToDelete, manifestListsToDelete);
   }
 
   private void deleteMetadataFiles(Set<String> manifestsToDelete, Set<String> 
manifestListsToDelete) {
     LOG.warn("Manifests to delete: {}", Joiner.on(", 
").join(manifestsToDelete));
+    LOG.warn("Manifests Lists to delete: {}", Joiner.on(", 
").join(manifestListsToDelete));
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to