rdblue commented on a change in pull request #928: Add failure handling in
cleanup while reading snapshot manifest-list files
URL: https://github.com/apache/incubator-iceberg/pull/928#discussion_r409188653
##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -201,112 +201,121 @@ private void cleanExpiredFiles(List<Snapshot>
snapshots, Set<Long> validIds, Set
// find manifests to clean up that are still referenced by a valid
snapshot, but written by an expired snapshot
Set<String> validManifests = Sets.newHashSet();
Set<ManifestFile> manifestsToScan = Sets.newHashSet();
- for (Snapshot snapshot : snapshots) {
- try (CloseableIterable<ManifestFile> manifests =
readManifestFiles(snapshot)) {
- for (ManifestFile manifest : manifests) {
- validManifests.add(manifest.path());
-
- long snapshotId = manifest.snapshotId();
- // whether the manifest was created by a valid snapshot (true) or an
expired snapshot (false)
- boolean fromValidSnapshots = validIds.contains(snapshotId);
- // whether the snapshot that created the manifest was an ancestor of
the table state
- boolean isFromAncestor = ancestorIds.contains(snapshotId);
- // whether the changes in this snapshot have been picked into the
current table state
- boolean isPicked = pickedAncestorSnapshotIds.contains(snapshotId);
- // if the snapshot that wrote this manifest is no longer valid (has
expired), then delete its deleted files.
- // note that this is only for expired snapshots that are in the
current table state
- if (!fromValidSnapshots && (isFromAncestor || isPicked) &&
manifest.hasDeletedFiles()) {
- manifestsToScan.add(manifest.copy());
- }
- }
-
- } catch (IOException e) {
- throw new RuntimeIOException(e,
- "Failed to close manifest list: %s",
snapshot.manifestListLocation());
- }
- }
+ Tasks.foreach(snapshots).noRetry().suppressFailureWhenFinished()
+ .onFailure((snapshot, exc) -> LOG.warn("Failed on snapshot {} while
reading manifest list: {}",
+ snapshot.snapshotId(), snapshot.manifestListLocation(), exc))
+ .run(
+ snapshot -> {
+ try (CloseableIterable<ManifestFile> manifests =
readManifestFiles(snapshot)) {
+ for (ManifestFile manifest : manifests) {
+ validManifests.add(manifest.path());
+
+ long snapshotId = manifest.snapshotId();
+ // whether the manifest was created by a valid snapshot
(true) or an expired snapshot (false)
+ boolean fromValidSnapshots = validIds.contains(snapshotId);
+ // whether the snapshot that created the manifest was an
ancestor of the table state
+ boolean isFromAncestor = ancestorIds.contains(snapshotId);
+ // whether the changes in this snapshot have been picked
into the current table state
+ boolean isPicked =
pickedAncestorSnapshotIds.contains(snapshotId);
+ // if the snapshot that wrote this manifest is no longer
valid (has expired),
+ // then delete its deleted files. note that this is only for
expired snapshots that are in the
+ // current table state
+ if (!fromValidSnapshots && (isFromAncestor || isPicked) &&
manifest.hasDeletedFiles()) {
+ manifestsToScan.add(manifest.copy());
+ }
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e,
+ "Failed to close manifest list: %s",
snapshot.manifestListLocation());
+ }
+ });
// find manifests to clean up that were only referenced by snapshots that
have expired
Set<String> manifestListsToDelete = Sets.newHashSet();
Set<String> manifestsToDelete = Sets.newHashSet();
Set<ManifestFile> manifestsToRevert = Sets.newHashSet();
- for (Snapshot snapshot : base.snapshots()) {
- long snapshotId = snapshot.snapshotId();
- if (!validIds.contains(snapshotId)) {
- // determine whether the changes in this snapshot are in the current
table state
- if (pickedAncestorSnapshotIds.contains(snapshotId)) {
- // this snapshot was cherry-picked into the current table state, so
skip cleaning it up. its changes will
- // expire when the picked snapshot expires.
- // A -- C -- D (source=B)
- // `- B <-- this commit
- continue;
- }
-
- long sourceSnapshotId = PropertyUtil.propertyAsLong(
- snapshot.summary(), SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
- if (ancestorIds.contains(sourceSnapshotId)) {
- // this commit was cherry-picked from a commit that is in the
current table state. do not clean up its
- // changes because it would revert data file additions that are in
the current table.
- // A -- B -- C
- // `- D (source=B) <-- this commit
- continue;
- }
-
- if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
- // this commit was cherry-picked from a commit that is in the
current table state. do not clean up its
- // changes because it would revert data file additions that are in
the current table.
- // A -- C -- E (source=B)
- // `- B `- D (source=B) <-- this commit
- continue;
- }
-
- // find any manifests that are no longer needed
- try (CloseableIterable<ManifestFile> manifests =
readManifestFiles(snapshot)) {
- for (ManifestFile manifest : manifests) {
- if (!validManifests.contains(manifest.path())) {
- manifestsToDelete.add(manifest.path());
-
- boolean isFromAncestor =
ancestorIds.contains(manifest.snapshotId());
- boolean isFromExpiringSnapshot =
expiredIds.contains(manifest.snapshotId());
-
- if (isFromAncestor && manifest.hasDeletedFiles()) {
- // Only delete data files that were deleted in by an expired
snapshot if that
- // snapshot is an ancestor of the current table state.
Otherwise, a snapshot that
- // deleted files and was rolled back will delete files that
could be in the current
- // table state.
- manifestsToScan.add(manifest.copy());
+ Tasks.foreach(base.snapshots()).noRetry().suppressFailureWhenFinished()
+ .onFailure((snapshot, exc) -> LOG.warn("Failed on snapshot {} while
reading manifest list: {}",
+ snapshot.snapshotId(), snapshot.manifestListLocation(), exc))
+ .run(
+ snapshot -> {
+ long snapshotId = snapshot.snapshotId();
+ if (!validIds.contains(snapshotId)) {
+ // determine whether the changes in this snapshot are in the
current table state
+ if (pickedAncestorSnapshotIds.contains(snapshotId)) {
+ // this snapshot was cherry-picked into the current table
state, so skip cleaning it up.
+ // its changes will expire when the picked snapshot expires.
+ // A -- C -- D (source=B)
+ // `- B <-- this commit
+ return;
+ }
+
+ long sourceSnapshotId = PropertyUtil.propertyAsLong(
+ snapshot.summary(),
SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, -1);
+ if (ancestorIds.contains(sourceSnapshotId)) {
+ // this commit was cherry-picked from a commit that is in
the current table state. do not clean up its
+ // changes because it would revert data file additions that
are in the current table.
+ // A -- B -- C
+ // `- D (source=B) <-- this commit
+ return;
+ }
+
+ if (pickedAncestorSnapshotIds.contains(sourceSnapshotId)) {
+ // this commit was cherry-picked from a commit that is in
the current table state. do not clean up its
+ // changes because it would revert data file additions that
are in the current table.
+ // A -- C -- E (source=B)
+ // `- B `- D (source=B) <-- this commit
+ return;
+ }
+
+ // find any manifests that are no longer needed
+ try (CloseableIterable<ManifestFile> manifests =
readManifestFiles(snapshot)) {
+ for (ManifestFile manifest : manifests) {
+ if (!validManifests.contains(manifest.path())) {
+ manifestsToDelete.add(manifest.path());
+
+ boolean isFromAncestor =
ancestorIds.contains(manifest.snapshotId());
+ boolean isFromExpiringSnapshot =
expiredIds.contains(manifest.snapshotId());
+
+ if (isFromAncestor && manifest.hasDeletedFiles()) {
+ // Only delete data files that were deleted in by an
expired snapshot if that
+ // snapshot is an ancestor of the current table state.
Otherwise, a snapshot that
+ // deleted files and was rolled back will delete files
that could be in the current
+ // table state.
+ manifestsToScan.add(manifest.copy());
+ }
+
+ if (!isFromAncestor && isFromExpiringSnapshot &&
manifest.hasAddedFiles()) {
+ // Because the manifest was written by a snapshot that
is not an ancestor of the
+ // current table state, the files added in this
manifest can be removed. The extra
+ // check whether the manifest was written by a known
snapshot that was expired in
+ // this commit ensures that the full ancestor list
between when the snapshot was
+ // written and this expiration is known and there is
no missing history. If history
+ // were missing, then the snapshot could be an
ancestor of the table state but the
+ // ancestor ID set would not contain it and this would
be unsafe.
+ manifestsToRevert.add(manifest.copy());
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e,
+ "Failed to close manifest list: %s",
snapshot.manifestListLocation());
+ }
+
+ // add the manifest list to the delete set, if present
+ if (snapshot.manifestListLocation() != null) {
+ manifestListsToDelete.add(snapshot.manifestListLocation());
+ }
}
-
- if (!isFromAncestor && isFromExpiringSnapshot &&
manifest.hasAddedFiles()) {
- // Because the manifest was written by a snapshot that is not
an ancestor of the
- // current table state, the files added in this manifest can
be removed. The extra
- // check whether the manifest was written by a known snapshot
that was expired in
- // this commit ensures that the full ancestor list between
when the snapshot was
- // written and this expiration is known and there is no
missing history. If history
- // were missing, then the snapshot could be an ancestor of the
table state but the
- // ancestor ID set would not contain it and this would be
unsafe.
- manifestsToRevert.add(manifest.copy());
- }
- }
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e,
- "Failed to close manifest list: %s",
snapshot.manifestListLocation());
- }
-
- // add the manifest list to the delete set, if present
- if (snapshot.manifestListLocation() != null) {
- manifestListsToDelete.add(snapshot.manifestListLocation());
- }
- }
- }
-
+ });
deleteDataFiles(manifestsToScan, manifestsToRevert, validIds);
deleteMetadataFiles(manifestsToDelete, manifestListsToDelete);
}
private void deleteMetadataFiles(Set<String> manifestsToDelete, Set<String>
manifestListsToDelete) {
LOG.warn("Manifests to delete: {}", Joiner.on(",
").join(manifestsToDelete));
+ LOG.warn("Manifests Lists to delete: {}", Joiner.on(",
").join(manifestListsToDelete));
Review comment:
+1
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]