dimas-b commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2153189948


##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -607,8 +829,15 @@ protected ViewMetadata commit(ViewOperations ops, 
UpdateTableRequest request) {
     return ops.current();
   }
 
-  private int maxCommitRetries() {
+  @VisibleForTesting
+  public int maxCommitRetries() {
     return configurationStore.getConfiguration(
         realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES);
   }
+
+  @VisibleForTesting
+  public boolean isRollbackCompactionEnabled() {

Review Comment:
   could be package-private



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase = 
setAppropriateLastSeqNumber(metadataBuilder.build());
+                  LOGGER.info(
+                      "Successfully roll-backed replace operation for 
table={}, with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      newBase.currentSnapshot().snapshotId());
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
 
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base, long expectedCurrentSnapshotId, String 
updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // Always check the ancestry for both branch and tags
+      // mostly for case where a branch was created and then was dropped
+      // then a tag was created and then rollback happened post that tag
+      // was dropped and branch was re-created on it.
+      for (Snapshot ancestor : 
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+        idsToRetain.add(ancestor.snapshotId());
+      }
+    }
+
+    List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+    Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of 
the given branch
+    // ensure this branch has the latest sequence number.
+    long expectedSequenceNumber = base.lastSequenceNumber();
+    Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+    while (snapshotId != null && !Objects.equals(snapshotId, 
expectedCurrentSnapshotId)) {
+      Snapshot snap = base.snapshot(snapshotId);
+      // catch un-expected state the commit sequence number are
+      // not continuous can happen for a table with multiple branches.
+      if (expectedSequenceNumber != snap.sequenceNumber()) {
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, Sequence Number 
are not continuous from {}",
+            base.uuid(),
+            snapshotId,
+            expectedSequenceNumber);
+        break;
+      }
+      if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+        // Either encountered a non no-op snapshot or the snapshot is being 
referenced by any other
+        // reference either by branch or a tag.
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, snapshot to be 
removed referenced by another branch or tag ancestor",
+            base.uuid(),
+            snapshotId);
+        break;
+      }
+      snapshotsToRemove.add(snap.snapshotId());
+      snapshotId = snap.parentId();
+      // we need continuous sequence number to correctly rollback
+      expectedSequenceNumber--;
+    }
+
+    boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, 
expectedCurrentSnapshotId);
+    updateToRemoveSnapshot.add(new 
MetadataUpdate.RemoveSnapshots(snapshotsToRemove));
+    return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
+  }
+
+  private boolean isRollbackSnapshot(Snapshot snapshot) {
+    // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be 
rollback.
+    return DataOperations.REPLACE.equals(snapshot.operation())
+        && PropertyUtil.propertyAsString(snapshot.summary(), 
CONFLICT_RESOLUTION_ACTION, "")
+            .equalsIgnoreCase("rollback");
+  }
+
+  private MetadataUpdate.SetSnapshotRef 
findSetSnapshotRefUpdate(UpdateTableRequest request) {
+    int total = 0;
+    MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.SetSnapshotRef) {
+        total++;
+        setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to rollback, make this 
Noop.
+    return total != 1 ? null : setSnapshotRefUpdate;
+  }
+
+  private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest 
request) {
+    int total = 0;
+    MetadataUpdate.AddSnapshot addSnapshot = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.AddSnapshot) {
+        total++;
+        addSnapshot = (MetadataUpdate.AddSnapshot) update;
+      }
+    }
+
+    // if > 1 assertion for addSnapshot, then it's not safe to rollback, make 
this Noop.
+    return total != 1 ? null : addSnapshot;
+  }
+
+  private TableMetadata setAppropriateLastSeqNumber(TableMetadata newBase) {
+    // TODO: Get rid of the reflection call once TableMetadata have API for it.
+    // move the lastSequenceNumber back, to apply snapshot properly on the
+    // current-metadata Seq number are considered increasing monotonically
+    // snapshot over snapshot, the client generates the manifest list and hence
+    // the sequence number can't be changed for a snapshot the only possible 
option
+    // then is to change the sequenceNumber tracked by metadata.json
+    try {
+      long lastSeqNumber = newBase.lastSequenceNumber();
+      // this should point to the sequence number that current tip of the
+      // branch belongs to, as the new commit will be applied on top of this.
+      LAST_SEQUENCE_NUMBER_FIELD.set(newBase, 
newBase.currentSnapshot().sequenceNumber());
+      LOGGER.info(
+          "Setting table :{} last sequence number from {} to {}",
+          newBase.uuid(),
+          lastSeqNumber,
+          newBase.lastSequenceNumber());

Review Comment:
   for humans to read this we probably need the table's name too? WDYT?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase = 
setAppropriateLastSeqNumber(metadataBuilder.build());
+                  LOGGER.info(
+                      "Successfully roll-backed replace operation for 
table={}, with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      newBase.currentSnapshot().snapshotId());
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this

Review Comment:
   always?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {

Review Comment:
   just wondering: why parentheses?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase = 
setAppropriateLastSeqNumber(metadataBuilder.build());
+                  LOGGER.info(
+                      "Successfully roll-backed replace operation for 
table={}, with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      newBase.currentSnapshot().snapshotId());
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
 
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base, long expectedCurrentSnapshotId, String 
updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // Always check the ancestry for both branch and tags
+      // mostly for case where a branch was created and then was dropped
+      // then a tag was created and then rollback happened post that tag
+      // was dropped and branch was re-created on it.
+      for (Snapshot ancestor : 
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+        idsToRetain.add(ancestor.snapshotId());
+      }
+    }
+
+    List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+    Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of 
the given branch
+    // ensure this branch has the latest sequence number.
+    long expectedSequenceNumber = base.lastSequenceNumber();
+    Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+    while (snapshotId != null && !Objects.equals(snapshotId, 
expectedCurrentSnapshotId)) {
+      Snapshot snap = base.snapshot(snapshotId);
+      // catch un-expected state the commit sequence number are
+      // not continuous can happen for a table with multiple branches.
+      if (expectedSequenceNumber != snap.sequenceNumber()) {
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, Sequence Number 
are not continuous from {}",
+            base.uuid(),
+            snapshotId,
+            expectedSequenceNumber);
+        break;
+      }
+      if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+        // Either encountered a non no-op snapshot or the snapshot is being 
referenced by any other
+        // reference either by branch or a tag.
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, snapshot to be 
removed referenced by another branch or tag ancestor",
+            base.uuid(),
+            snapshotId);
+        break;
+      }
+      snapshotsToRemove.add(snap.snapshotId());
+      snapshotId = snap.parentId();
+      // we need continuous sequence number to correctly rollback
+      expectedSequenceNumber--;
+    }
+
+    boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, 
expectedCurrentSnapshotId);
+    updateToRemoveSnapshot.add(new 
MetadataUpdate.RemoveSnapshots(snapshotsToRemove));
+    return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
+  }
+
+  private boolean isRollbackSnapshot(Snapshot snapshot) {
+    // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be 
rollback.
+    return DataOperations.REPLACE.equals(snapshot.operation())
+        && PropertyUtil.propertyAsString(snapshot.summary(), 
CONFLICT_RESOLUTION_ACTION, "")
+            .equalsIgnoreCase("rollback");
+  }
+
+  private MetadataUpdate.SetSnapshotRef 
findSetSnapshotRefUpdate(UpdateTableRequest request) {
+    int total = 0;
+    MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.SetSnapshotRef) {
+        total++;
+        setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to rollback, make this 
Noop.
+    return total != 1 ? null : setSnapshotRefUpdate;
+  }
+
+  private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest 
request) {
+    int total = 0;
+    MetadataUpdate.AddSnapshot addSnapshot = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.AddSnapshot) {
+        total++;
+        addSnapshot = (MetadataUpdate.AddSnapshot) update;
+      }
+    }
+
+    // if > 1 assertion for addSnapshot, then it's not safe to rollback, make 
this Noop.
+    return total != 1 ? null : addSnapshot;
+  }
+
+  private TableMetadata setAppropriateLastSeqNumber(TableMetadata newBase) {
+    // TODO: Get rid of the reflection call once TableMetadata have API for it.
+    // move the lastSequenceNumber back, to apply snapshot properly on the
+    // current-metadata Seq number are considered increasing monotonically
+    // snapshot over snapshot, the client generates the manifest list and hence
+    // the sequence number can't be changed for a snapshot the only possible 
option
+    // then is to change the sequenceNumber tracked by metadata.json
+    try {
+      long lastSeqNumber = newBase.lastSequenceNumber();
+      // this should point to the sequence number that current tip of the
+      // branch belongs to, as the new commit will be applied on top of this.
+      LAST_SEQUENCE_NUMBER_FIELD.set(newBase, 
newBase.currentSnapshot().sequenceNumber());
+      LOGGER.info(
+          "Setting table :{} last sequence number from {} to {}",

Review Comment:
   nit: space after `:`, not before



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry

Review Comment:
   Don't you think this comment is still relevant?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase = 
setAppropriateLastSeqNumber(metadataBuilder.build());

Review Comment:
   could we set `lastSequenceNumber` in the _builder_ instead? I think that 
would be preferable from the java language perspective because that field is 
not `final`. Also, exposing a setter in the builder upstream is more natural.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java:
##########
@@ -433,39 +461,233 @@ protected TableMetadata commit(TableOperations ops, 
UpdateTableRequest request)
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
           .run(
-              taskOps -> {
+              (taskOps) -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
                 } catch (CommitFailedException e) {
-                  // wrap and rethrow outside of tasks to avoid unnecessary 
retry
-                  throw new ValidationFailureException(e);
+                  if (!isRollbackCompactionEnabled()) {
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. Its not a 
hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase = 
setAppropriateLastSeqNumber(metadataBuilder.build());
+                  LOGGER.info(
+                      "Successfully roll-backed replace operation for 
table={}, with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      newBase.currentSnapshot().snapshotId());
                 }
-
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
-                if (updated.changes().isEmpty()) {
-                  // do not commit if the metadata has not changed
-                  return;
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
+                } catch (CommitFailedException e) {
+                  throw new ValidationFailureException(e);
                 }
 
-                // commit
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
+                // always commit this
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
 
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base, long expectedCurrentSnapshotId, String 
updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // Always check the ancestry for both branch and tags
+      // mostly for case where a branch was created and then was dropped
+      // then a tag was created and then rollback happened post that tag
+      // was dropped and branch was re-created on it.
+      for (Snapshot ancestor : 
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+        idsToRetain.add(ancestor.snapshotId());
+      }
+    }
+
+    List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+    Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of 
the given branch
+    // ensure this branch has the latest sequence number.
+    long expectedSequenceNumber = base.lastSequenceNumber();
+    Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+    while (snapshotId != null && !Objects.equals(snapshotId, 
expectedCurrentSnapshotId)) {
+      Snapshot snap = base.snapshot(snapshotId);
+      // catch un-expected state the commit sequence number are
+      // not continuous can happen for a table with multiple branches.
+      if (expectedSequenceNumber != snap.sequenceNumber()) {
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, Sequence Number 
are not continuous from {}",
+            base.uuid(),
+            snapshotId,
+            expectedSequenceNumber);
+        break;
+      }
+      if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+        // Either encountered a non no-op snapshot or the snapshot is being 
referenced by any other
+        // reference either by branch or a tag.
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, snapshot to be 
removed referenced by another branch or tag ancestor",
+            base.uuid(),
+            snapshotId);
+        break;
+      }
+      snapshotsToRemove.add(snap.snapshotId());
+      snapshotId = snap.parentId();
+      // we need continuous sequence number to correctly rollback
+      expectedSequenceNumber--;

Review Comment:
   I'm not sure this assumption holds :thinking: What if we have two branches, 
`main` and `dev`, that get commits in an alternating manner, but have only one 
(very old) commit in common? If we're rolling back commits from `main` they 
will not have contiguous sequence numbers, but logically they can be rolled 
back. WDYT?
   
   Do we even need to worry about `expectedSequenceNumber` after we validate 
the first snapshot to roll back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to