This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch release/1.0.x
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/release/1.0.x by this push:
     new 9d1514b49 Feature: Rollback compaction on conflict (#1285)
9d1514b49 is described below

commit 9d1514b4906a6a66c1ca2e45aa4e72ff848a044c
Author: Prashant Singh <35593236+singhpk...@users.noreply.github.com>
AuthorDate: Fri Jun 20 07:59:34 2025 -0700

    Feature: Rollback compaction on conflict (#1285)
    
    Intention is make the catalog smarter, to revert the compaction commits in 
case of crunch to let the writers who are actually adding or removing the data 
to the table succeed. In a sense treating compaction as always a lower priority 
process.
    
    Presently the rest catalog client creates the snapshot and asks the Rest 
Server to apply the snapshot and gives this in a combination of requirement and 
update.
    
    Polaris could apply some basic inference and generate some updates to 
metadata given a property is enabled at a table level, by saying that It will 
revert back the commit which was created by compaction and let the write 
succeed.
    I had this PR in OSS, which was essentially doing this at the client end, 
but we think its best if we do this as server end. to support more such clients.
    
    How to use this
    Enable a catalog level configuration : 
polaris.config.rollback.compaction.on-conflicts.enabled when this is enabled 
polaris will apply the intelligence of rollbacking those REPLACE ops snapshot 
which have the property of polaris.internal.rollback.compaction.on-conflict in 
their snapshot summary to resolve conflicts at the server end !
    a sample use case is there is a deployment of a Polaris where this config 
is enabled and there is auto compaction (maintenance job) which is updating the 
table state, it adds the snapshot summary that 
polaris.internal.rollback.compaction.on-conflict is true now when a backfill 
process running for 8 hours want to commit but can't because the compaction job 
committed before so in this case it will reach out to Polaris and Polaris will 
see if the snapshot of compation aka replace snapsho [...]
    
    Devlist: https://lists.apache.org/thread/8k8t77dgk1vc124fnb61932bdp9kf1lc
---
 .../polaris/core/config/FeatureConfiguration.java  |  11 +
 .../quarkus/catalog/IcebergCatalogTest.java        | 359 +++++++++++++++++++++
 .../catalog/iceberg/CatalogHandlerUtils.java       | 289 +++++++++++++++--
 3 files changed, 636 insertions(+), 23 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 1e0e96329..fe265c307 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -287,4 +287,15 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
                   + "This should only be set to 'true' for tests!")
           .defaultValue(false)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Boolean> 
ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS =
+      PolarisConfiguration.<Boolean>builder()
+          .key("ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS")
+          
.catalogConfig("polaris.config.rollback.compaction.on-conflicts.enabled")
+          .description(
+              "Rollback replace snapshots created by compaction which have "
+                  + 
"polaris.internal.conflict-resolution.by-operation-type.replace property set to 
rollback "
+                  + "in their snapshot summary")
+          .defaultValue(false)
+          .buildFeatureConfiguration();
 }
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index 2b4c4205c..becd3dcfe 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -19,6 +19,8 @@
 package org.apache.polaris.service.quarkus.catalog;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doReturn;
@@ -29,6 +31,8 @@ import static org.mockito.Mockito.when;
 import com.azure.core.exception.HttpResponseException;
 import com.google.cloud.storage.StorageException;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
 import io.quarkus.test.junit.QuarkusMock;
 import io.quarkus.test.junit.QuarkusTestProfile;
 import io.quarkus.test.junit.TestProfile;
@@ -37,6 +41,7 @@ import jakarta.annotation.Nullable;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Method;
 import java.time.Clock;
 import java.util.Arrays;
@@ -48,16 +53,29 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateRequirement;
+import org.apache.iceberg.UpdateRequirements;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
@@ -68,8 +86,11 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -111,6 +132,7 @@ import 
org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
 import org.apache.polaris.service.admin.PolarisAdminService;
 import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
+import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils;
 import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
@@ -135,7 +157,9 @@ import org.apache.polaris.service.task.TaskFileIOSupplier;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.apache.polaris.service.types.NotificationType;
 import org.apache.polaris.service.types.TableUpdateNotification;
+import org.assertj.core.api.AbstractCollectionAssert;
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ListAssert;
 import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
 import org.assertj.core.configuration.PreferredAssumptionException;
 import org.junit.jupiter.api.AfterEach;
@@ -164,6 +188,15 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
         PreferredAssumptionException.JUNIT5);
   }
 
+  DeleteFile FILE_A_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-a-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("id_bucket=0") // easy way to set partition data 
for now
+          .withRecordCount(1)
+          .build();
+
   public static class Profile implements QuarkusTestProfile {
 
     @Override
@@ -547,6 +580,332 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
         .hasMessageContaining("Parent");
   }
 
+  @Test
+  public void testConcurrentWritesWithRollbackNonEmptyTable() {
+    IcebergCatalog catalog = this.catalog();
+    if (this.requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, 
SCHEMA).withPartitionSpec(SPEC).create();
+    this.assertNoFiles(table);
+
+    // commit FILE_A
+    catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+    this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+    table.refresh();
+
+    long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Apply the deletes based on FILE_A
+    // this should conflict when we try to commit without the change.
+    RowDelta originalRowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(FILE_A_DELETES)
+            .validateFromSnapshot(lastSnapshotId)
+            .validateDataFilesExist(List.of(FILE_A.location()));
+    // Make client ready with updates, don't reach out to IRC server yet
+    Snapshot s = originalRowDelta.apply();
+    TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+    TableMetadata base = ops.current();
+    TableMetadata.Builder update = TableMetadata.buildFrom(base);
+    update.setBranchSnapshot(s, "main");
+    TableMetadata updatedMetadata = update.build();
+    List<MetadataUpdate> updates = updatedMetadata.changes();
+    List<UpdateRequirement> requirements = 
UpdateRequirements.forUpdateTable(base, updates);
+    UpdateTableRequest request = UpdateTableRequest.create(TABLE, 
requirements, updates);
+
+    // replace FILE_A with FILE_B
+    // set the snapshot property in the summary to make this snapshot
+    // rollback-able.
+    catalog
+        .loadTable(TABLE)
+        .newRewrite()
+        .addFile(FILE_B)
+        .deleteFile(FILE_A)
+        .set("polaris.internal.conflict-resolution.by-operation-type.replace", 
"rollback")
+        .commit();
+
+    try {
+      // Now call IRC server to commit delete operation.
+      CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, 
true);
+      catalogHandlerUtils.commit(((BaseTable) 
catalog.loadTable(TABLE)).operations(), request);
+    } catch (Exception e) {
+      fail("Rollback Compaction on conflict feature failed : " + e);
+    }
+
+    table.refresh();
+
+    // Assert only 2 snapshots and no snapshot of REPLACE left.
+    Snapshot currentSnapshot = 
table.snapshot(table.refs().get("main").snapshotId());
+    int totalSnapshots = 1;
+    while (currentSnapshot.parentId() != null) {
+      // no snapshot in the hierarchy for REPLACE operations
+      
assertThat(currentSnapshot.operation()).isNotEqualTo(DataOperations.REPLACE);
+      currentSnapshot = table.snapshot(currentSnapshot.parentId());
+      totalSnapshots += 1;
+    }
+    assertThat(totalSnapshots).isEqualTo(2);
+
+    // Inspect the files 1 DELETE file i.e. FILE_A_DELETES and 1 DATA FILE 
FILE_A
+    try {
+      try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+        List<CharSequence> dataFilePaths =
+            Streams.stream(tasks)
+                .map(ContentScanTask::file)
+                .map(ContentFile::location)
+                .collect(Collectors.toList());
+        List<CharSequence> deleteFilePaths =
+            Streams.stream(tasks)
+                .flatMap(t -> t.deletes().stream().map(ContentFile::location))
+                .collect(Collectors.toList());
+        ((ListAssert)
+                Assertions.assertThat(dataFilePaths)
+                    .as("Should contain expected number of data files", new 
Object[0]))
+            .hasSize(1);
+        ((ListAssert)
+                Assertions.assertThat(deleteFilePaths)
+                    .as("Should contain expected number of delete files", new 
Object[0]))
+            .hasSize(1);
+        ((AbstractCollectionAssert)
+                Assertions.assertThat(CharSequenceSet.of(dataFilePaths))
+                    .as("Should contain correct file paths", new Object[0]))
+            .isEqualTo(
+                CharSequenceSet.of(
+                    Iterables.transform(Arrays.asList(FILE_A), 
ContentFile::location)));
+        ((AbstractCollectionAssert)
+                Assertions.assertThat(CharSequenceSet.of(deleteFilePaths))
+                    .as("Should contain correct file paths", new Object[0]))
+            .isEqualTo(
+                CharSequenceSet.of(
+                    Iterables.transform(Arrays.asList(FILE_A_DELETES), 
ContentFile::location)));
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Test
+  public void 
testConcurrentWritesWithRollbackWithNonReplaceSnapshotInBetween() {
+    IcebergCatalog catalog = this.catalog();
+    if (this.requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, 
SCHEMA).withPartitionSpec(SPEC).create();
+    this.assertNoFiles(table);
+
+    // commit FILE_A
+    catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+    this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+    table.refresh();
+
+    long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Apply the deletes based on FILE_A
+    // this should conflict when we try to commit without the change.
+    RowDelta originalRowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(FILE_A_DELETES)
+            .validateFromSnapshot(lastSnapshotId)
+            .validateDataFilesExist(List.of(FILE_A.location()));
+    // Make client ready with updates, don't reach out to IRC server yet
+    Snapshot s = originalRowDelta.apply();
+    TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+    TableMetadata base = ops.current();
+    TableMetadata.Builder update = TableMetadata.buildFrom(base);
+    update.setBranchSnapshot(s, "main");
+    TableMetadata updatedMetadata = update.build();
+    List<MetadataUpdate> updates = updatedMetadata.changes();
+    List<UpdateRequirement> requirements = 
UpdateRequirements.forUpdateTable(base, updates);
+    UpdateTableRequest request = UpdateTableRequest.create(TABLE, 
requirements, updates);
+
+    // replace FILE_A with FILE_B
+    // commit the transaction.
+    catalog
+        .loadTable(TABLE)
+        .newRewrite()
+        .addFile(FILE_B)
+        .deleteFile(FILE_A)
+        .set("polaris.internal.conflict-resolution.by-operation-type.replace", 
"rollback")
+        .commit();
+
+    // commit FILE_C
+    catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit();
+    CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+    Assertions.assertThatThrownBy(
+            () ->
+                catalogHandlerUtils.commit(
+                    ((BaseTable) catalog.loadTable(TABLE)).operations(), 
request))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Requirement failed: branch main has changed");
+
+    table.refresh();
+
+    // Assert only 3 snapshots
+    Snapshot currentSnapshot = 
table.snapshot(table.refs().get("main").snapshotId());
+    int totalSnapshots = 1;
+    while (currentSnapshot.parentId() != null) {
+      currentSnapshot = table.snapshot(currentSnapshot.parentId());
+      totalSnapshots += 1;
+    }
+    assertThat(totalSnapshots).isEqualTo(3);
+    this.assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C);
+  }
+
+  @Test
+  public void
+      
testConcurrentWritesWithRollbackEnableWithToRollbackSnapshotReferencedByOtherBranch()
 {
+    IcebergCatalog catalog = this.catalog();
+    if (this.requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, 
SCHEMA).withPartitionSpec(SPEC).create();
+    this.assertNoFiles(table);
+
+    // commit FILE_A
+    catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+    this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+    table.refresh();
+
+    long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Apply the deletes based on FILE_A
+    // this should conflict when we try to commit without the change.
+    RowDelta originalRowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(FILE_A_DELETES)
+            .validateFromSnapshot(lastSnapshotId)
+            .validateDataFilesExist(List.of(FILE_A.location()));
+    // Make client ready with updates, don't reach out to IRC server yet
+    Snapshot s = originalRowDelta.apply();
+    TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+    TableMetadata base = ops.current();
+    TableMetadata.Builder update = TableMetadata.buildFrom(base);
+    update.setBranchSnapshot(s, "main");
+    TableMetadata updatedMetadata = update.build();
+    List<MetadataUpdate> updates = updatedMetadata.changes();
+    List<UpdateRequirement> requirements = 
UpdateRequirements.forUpdateTable(base, updates);
+    UpdateTableRequest request = UpdateTableRequest.create(TABLE, 
requirements, updates);
+
+    // replace FILE_A with FILE_B
+    catalog
+        .loadTable(TABLE)
+        .newRewrite()
+        .addFile(FILE_B)
+        .deleteFile(FILE_A)
+        .set("polaris.internal.conflict-resolution.by-operation-type.replace", 
"rollback")
+        .commit();
+
+    Table t = catalog.loadTable(TABLE);
+    // add another branch B
+    t.manageSnapshots()
+        .createBranch("non-main")
+        .setCurrentSnapshot(t.currentSnapshot().snapshotId())
+        .commit();
+    // now add more files to non-main branch
+    
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit();
+    CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+    Assertions.assertThatThrownBy(
+            () ->
+                catalogHandlerUtils.commit(
+                    ((BaseTable) catalog.loadTable(TABLE)).operations(), 
request))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Requirement failed: branch main has changed");
+
+    table.refresh();
+
+    // Assert only 3 snapshots
+    Snapshot currentSnapshot = 
table.snapshot(table.refs().get("main").snapshotId());
+    int totalSnapshots = 1;
+    while (currentSnapshot.parentId() != null) {
+      currentSnapshot = table.snapshot(currentSnapshot.parentId());
+      totalSnapshots += 1;
+    }
+    assertThat(totalSnapshots).isEqualTo(2);
+    this.assertFiles(catalog.loadTable(TABLE), FILE_B);
+  }
+
+  @Test
+  public void 
testConcurrentWritesWithRollbackWithConcurrentWritesToDifferentBranches() {
+    IcebergCatalog catalog = this.catalog();
+    if (this.requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table = catalog.buildTable(TABLE, 
SCHEMA).withPartitionSpec(SPEC).create();
+    this.assertNoFiles(table);
+
+    // commit FILE_A to main branch
+    catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+    this.assertFiles(catalog.loadTable(TABLE), FILE_A);
+    table.refresh();
+
+    Table t = catalog.loadTable(TABLE);
+    // add another branch B
+    t.manageSnapshots()
+        .createBranch("non-main")
+        .setCurrentSnapshot(t.currentSnapshot().snapshotId())
+        .commit();
+
+    long lastSnapshotId = table.currentSnapshot().snapshotId();
+
+    // Apply the deletes based on FILE_A
+    // this should conflict when we try to commit without the change.
+    RowDelta originalRowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(FILE_A_DELETES)
+            .validateFromSnapshot(lastSnapshotId)
+            .validateDataFilesExist(List.of(FILE_A.location()));
+    // Make client ready with updates, don't reach out to IRC server yet
+    Snapshot s = originalRowDelta.apply();
+    TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations();
+    TableMetadata base = ops.current();
+    TableMetadata.Builder update = TableMetadata.buildFrom(base);
+    update.setBranchSnapshot(s, "main");
+    TableMetadata updatedMetadata = update.build();
+    List<MetadataUpdate> updates = updatedMetadata.changes();
+    List<UpdateRequirement> requirements = 
UpdateRequirements.forUpdateTable(base, updates);
+    UpdateTableRequest request = UpdateTableRequest.create(TABLE, 
requirements, updates);
+
+    // replace FILE_A with FILE_B on main branch
+    catalog
+        .loadTable(TABLE)
+        .newRewrite()
+        .addFile(FILE_B)
+        .deleteFile(FILE_A)
+        .set("polaris.internal.conflict-resolution.by-operation-type.replace", 
"rollback")
+        .commit();
+
+    // now add more files to non-main branch, this will make sequence number 
non monotonic for main
+    // branch
+    
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit();
+    CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true);
+    Assertions.assertThatThrownBy(
+            () ->
+                catalogHandlerUtils.commit(
+                    ((BaseTable) catalog.loadTable(TABLE)).operations(), 
request))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Requirement failed: branch main has changed");
+
+    table.refresh();
+
+    // Assert only 3 snapshots
+    Snapshot currentSnapshot = 
table.snapshot(table.refs().get("main").snapshotId());
+    int totalSnapshots = 1;
+    while (currentSnapshot.parentId() != null) {
+      currentSnapshot = table.snapshot(currentSnapshot.parentId());
+      totalSnapshots += 1;
+    }
+    assertThat(totalSnapshots).isEqualTo(2);
+    this.assertFiles(catalog.loadTable(TABLE), FILE_B);
+  }
+
   @Test
   public void testValidateNotificationWhenTableAndNamespacesDontExist() {
     Assumptions.assumeTrue(
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
index aa99d53f5..ae879ea5f 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
@@ -22,16 +22,21 @@ import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAUL
 import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
 import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
+import java.lang.reflect.Field;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,9 +44,13 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
@@ -74,6 +83,8 @@ import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.rest.responses.LoadViewResponse;
 import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.view.BaseView;
 import org.apache.iceberg.view.SQLViewRepresentation;
@@ -85,6 +96,8 @@ import org.apache.iceberg.view.ViewRepresentation;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.context.RealmContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains 
a collection of
@@ -92,17 +105,41 @@ import org.apache.polaris.core.context.RealmContext;
  */
 @ApplicationScoped
 public class CatalogHandlerUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogHandlerUtils.class);
+
   private static final Schema EMPTY_SCHEMA = new Schema();
   private static final String INITIAL_PAGE_TOKEN = "";
+  private static final String CONFLICT_RESOLUTION_ACTION =
+      "polaris.internal.conflict-resolution.by-operation-type.replace";
+  private static final Field LAST_SEQUENCE_NUMBER_FIELD;
 
-  private final RealmContext realmContext;
-  private final PolarisConfigurationStore configurationStore;
+  static {
+    try {
+      LAST_SEQUENCE_NUMBER_FIELD =
+          TableMetadata.Builder.class.getDeclaredField("lastSequenceNumber");
+      LAST_SEQUENCE_NUMBER_FIELD.setAccessible(true);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException("Unable to access field", e);
+    }
+  }
+
+  private final int maxCommitRetries;
+  private final boolean rollbackCompactionEnabled;
 
   @Inject
   public CatalogHandlerUtils(
       RealmContext realmContext, PolarisConfigurationStore configurationStore) 
{
-    this.realmContext = realmContext;
-    this.configurationStore = configurationStore;
+    this(
+        configurationStore.getConfiguration(
+            realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES),
+        configurationStore.getConfiguration(
+            realmContext, 
FeatureConfiguration.ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS));
+  }
+
+  @VisibleForTesting
+  public CatalogHandlerUtils(int maxCommitRetries, boolean 
rollbackCompactionEnabled) {
+    this.maxCommitRetries = maxCommitRetries;
+    this.rollbackCompactionEnabled = rollbackCompactionEnabled;
   }
 
   /**
@@ -421,11 +458,12 @@ public class CatalogHandlerUtils {
     return ops.current();
   }
 
-  protected TableMetadata commit(TableOperations ops, UpdateTableRequest 
request) {
+  @VisibleForTesting
+  public TableMetadata commit(TableOperations ops, UpdateTableRequest request) 
{
     AtomicBoolean isRetry = new AtomicBoolean(false);
     try {
       Tasks.foreach(ops)
-          .retry(maxCommitRetries())
+          .retry(maxCommitRetries)
           .exponentialBackoff(
               COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
               COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
@@ -435,30 +473,106 @@ public class CatalogHandlerUtils {
           .run(
               taskOps -> {
                 TableMetadata base = isRetry.get() ? taskOps.refresh() : 
taskOps.current();
-                isRetry.set(true);
 
-                // validate requirements
+                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
+                TableMetadata newBase = base;
                 try {
-                  request.requirements().forEach(requirement -> 
requirement.validate(base));
+                  request.requirements().forEach((requirement) -> 
requirement.validate(base));
+                } catch (CommitFailedException e) {
+                  if (!rollbackCompactionEnabled) {
+                    // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                    throw new ValidationFailureException(e);
+                  }
+                  LOGGER.debug(
+                      "Attempting to Rollback replace operations for table={}, 
with current-snapshot-id={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId());
+                  UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId =
+                      findAssertRefSnapshotID(request);
+                  MetadataUpdate.SetSnapshotRef setSnapshotRef = 
findSetSnapshotRefUpdate(request);
+
+                  if (assertRefSnapshotId == null || setSnapshotRef == null) {
+                    // This implies the request was not trying to add a 
snapshot.
+                    LOGGER.debug(
+                        "Giving up on Rollback replace operations for 
table={}, with current-snapshot-id={}, as operation doesn't attempts to add a 
single snapshot",
+                        base.uuid(),
+                        base.currentSnapshot().snapshotId());
+                    // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                    throw new ValidationFailureException(e);
+                  }
+
+                  // snapshot-id the client expects the table 
current_snapshot_id
+                  long expectedCurrentSnapshotId = 
assertRefSnapshotId.snapshotId();
+
+                  MetadataUpdate.AddSnapshot snapshotToBeAdded = 
findAddSnapshotUpdate(request);
+                  if (snapshotToBeAdded == null) {
+                    // Re-throw if, there's no snapshot data to be added.
+                    // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                    throw new ValidationFailureException(e);
+                  }
+
+                  LOGGER.info(
+                      "Attempting to Rollback replace operation for table={}, 
with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      snapshotToBeAdded.snapshot().snapshotId());
+
+                  List<MetadataUpdate> metadataUpdates =
+                      generateUpdatesToRemoveNoopSnapshot(
+                          base, expectedCurrentSnapshotId, 
setSnapshotRef.name());
+
+                  if (metadataUpdates == null || metadataUpdates.isEmpty()) {
+                    // Nothing can be done as this implies that there were not 
all
+                    // No-op snapshots (REPLACE) between 
expectedCurrentSnapshotId and
+                    // currentSnapshotId. hence re-throw the exception caught.
+                    // wrap and rethrow outside of tasks to avoid unnecessary 
retry
+                    throw new ValidationFailureException(e);
+                  }
+                  // Set back the ref we wanted to set, back to the snapshot-id
+                  // the client is expecting the table to be at.
+                  metadataBuilder.setBranchSnapshot(
+                      expectedCurrentSnapshotId, setSnapshotRef.name());
+
+                  // apply the remove snapshots update in the current metadata.
+                  // NOTE: we need to setRef to expectedCurrentSnapshotId 
first and then apply
+                  // remove, as otherwise the remove will drop the reference.
+                  // NOTE: we can skip removing the now orphan base. It's not 
a hard requirement.
+                  // just something good to do, and not leave for Remove 
Orphans.
+                  // Ref rolled back update correctly to snapshot to be 
committed parent now.
+                  metadataUpdates.forEach((update -> 
update.applyTo(metadataBuilder)));
+                  newBase =
+                      setAppropriateLastSeqNumber(
+                              metadataBuilder,
+                              base.uuid(),
+                              base.lastSequenceNumber(),
+                              
base.snapshot(expectedCurrentSnapshotId).sequenceNumber())
+                          .build();
+                  LOGGER.info(
+                      "Successfully roll-backed replace operation for 
table={}, with current-snapshot-id={}, to snapshot={}",
+                      base.uuid(),
+                      base.currentSnapshot().snapshotId(),
+                      newBase.currentSnapshot().snapshotId());
+                }
+                // double check if the requirements passes now.
+                try {
+                  TableMetadata baseWithRemovedSnaps = newBase;
+                  request
+                      .requirements()
+                      .forEach((requirement) -> 
requirement.validate(baseWithRemovedSnaps));
                 } catch (CommitFailedException e) {
                   // wrap and rethrow outside of tasks to avoid unnecessary 
retry
                   throw new ValidationFailureException(e);
                 }
 
-                // apply changes
-                TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(base);
-                request.updates().forEach(update -> 
update.applyTo(metadataBuilder));
-
-                TableMetadata updated = metadataBuilder.build();
+                TableMetadata.Builder newMetadataBuilder = 
TableMetadata.buildFrom(newBase);
+                request.updates().forEach((update) -> 
update.applyTo(newMetadataBuilder));
+                TableMetadata updated = newMetadataBuilder.build();
                 if (updated.changes().isEmpty()) {
                   // do not commit if the metadata has not changed
                   return;
                 }
-
-                // commit
                 taskOps.commit(base, updated);
               });
-
     } catch (ValidationFailureException e) {
       throw e.wrapped();
     }
@@ -466,6 +580,140 @@ public class CatalogHandlerUtils {
     return ops.current();
   }
 
+  private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID(
+      UpdateTableRequest request) {
+    UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null;
+    int total = 0;
+    for (UpdateRequirement requirement : request.requirements()) {
+      if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) {
+        ++total;
+        assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) 
requirement;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to roll back, make this 
Noop.
+    return total != 1 ? null : assertRefSnapshotID;
+  }
+
+  private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot(
+      TableMetadata base, long expectedCurrentSnapshotId, String 
updateRefName) {
+    // find the all the snapshots we want to retain which are not the part of 
current branch.
+    Set<Long> idsToRetain = Sets.newHashSet();
+    for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) {
+      String refName = ref.getKey();
+      SnapshotRef snapshotRef = ref.getValue();
+      if (refName.equals(updateRefName)) {
+        continue;
+      }
+      idsToRetain.add(ref.getValue().snapshotId());
+      // Always check the ancestry for both branch and tags
+      // mostly for case where a branch was created and then was dropped
+      // then a tag was created and then rollback happened post that tag
+      // was dropped and branch was re-created on it.
+      for (Snapshot ancestor : 
SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) {
+        idsToRetain.add(ancestor.snapshotId());
+      }
+    }
+
+    List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>();
+    Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of 
the given branch
+    // ensure this branch has the latest sequence number.
+    long expectedSequenceNumber = base.lastSequenceNumber();
+    // Unexpected state as table's current sequence number is not equal to the
+    // most recent snapshot the ref points to.
+    if (expectedSequenceNumber != base.snapshot(snapshotId).sequenceNumber()) {
+      LOGGER.debug(
+          "Giving up rolling back table {} to snapshot {}, ref current 
snapshot sequence number {} is not equal expected sequence number {}",
+          base.uuid(),
+          snapshotId,
+          base.snapshot(snapshotId).sequenceNumber(),
+          expectedSequenceNumber);
+      return null;
+    }
+    Set<Long> snapshotsToRemove = new LinkedHashSet<>();
+    while (snapshotId != null && !Objects.equals(snapshotId, 
expectedCurrentSnapshotId)) {
+      Snapshot snap = base.snapshot(snapshotId);
+      if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) {
+        // Either encountered a non no-op snapshot or the snapshot is being 
referenced by any other
+        // reference either by branch or a tag.
+        LOGGER.debug(
+            "Giving up rolling back table {} to snapshot {}, snapshot to be 
removed referenced by another branch or tag ancestor",
+            base.uuid(),
+            snapshotId);
+        break;
+      }
+      snapshotsToRemove.add(snap.snapshotId());
+      snapshotId = snap.parentId();
+    }
+
+    boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, 
expectedCurrentSnapshotId);
+    updateToRemoveSnapshot.add(new 
MetadataUpdate.RemoveSnapshots(snapshotsToRemove));
+    return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null;
+  }
+
+  private boolean isRollbackSnapshot(Snapshot snapshot) {
+    // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be 
rollback.
+    return DataOperations.REPLACE.equals(snapshot.operation())
+        && PropertyUtil.propertyAsString(snapshot.summary(), 
CONFLICT_RESOLUTION_ACTION, "")
+            .equalsIgnoreCase("rollback");
+  }
+
+  private MetadataUpdate.SetSnapshotRef 
findSetSnapshotRefUpdate(UpdateTableRequest request) {
+    int total = 0;
+    MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.SetSnapshotRef) {
+        total++;
+        setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update;
+      }
+    }
+
+    // if > 1 assertion for refs, then it's not safe to rollback, make this 
Noop.
+    return total != 1 ? null : setSnapshotRefUpdate;
+  }
+
+  private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest 
request) {
+    int total = 0;
+    MetadataUpdate.AddSnapshot addSnapshot = null;
+    // find the SetRefName snapshot update
+    for (MetadataUpdate update : request.updates()) {
+      if (update instanceof MetadataUpdate.AddSnapshot) {
+        total++;
+        addSnapshot = (MetadataUpdate.AddSnapshot) update;
+      }
+    }
+
+    // if > 1 assertion for addSnapshot, then it's not safe to rollback, make 
this Noop.
+    return total != 1 ? null : addSnapshot;
+  }
+
+  private TableMetadata.Builder setAppropriateLastSeqNumber(
+      TableMetadata.Builder metadataBuilder,
+      String tableUUID,
+      long currentSequenceNumber,
+      long expectedSequenceNumber) {
+    // TODO: Get rid of the reflection call once TableMetadata have API for it.
+    // move the lastSequenceNumber back, to apply snapshot properly on the
+    // current-metadata Seq number are considered increasing monotonically
+    // snapshot over snapshot, the client generates the manifest list and hence
+    // the sequence number can't be changed for a snapshot the only possible 
option
+    // then is to change the sequenceNumber tracked by metadata.json
+    try {
+      // this should point to the sequence number that current tip of the
+      // branch belongs to, as the new commit will be applied on top of this.
+      LAST_SEQUENCE_NUMBER_FIELD.set(metadataBuilder, expectedSequenceNumber);
+      LOGGER.info(
+          "Setting table uuid:{} last sequence number from:{} to {}",
+          tableUUID,
+          currentSequenceNumber,
+          expectedSequenceNumber);
+    } catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+    return metadataBuilder;
+  }
+
   private BaseView asBaseView(View view) {
     Preconditions.checkState(
         view instanceof BaseView, "Cannot wrap catalog that does not produce 
BaseView");
@@ -565,7 +813,7 @@ public class CatalogHandlerUtils {
     AtomicBoolean isRetry = new AtomicBoolean(false);
     try {
       Tasks.foreach(ops)
-          .retry(maxCommitRetries())
+          .retry(maxCommitRetries)
           .exponentialBackoff(
               COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
               COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
@@ -606,9 +854,4 @@ public class CatalogHandlerUtils {
 
     return ops.current();
   }
-
-  private int maxCommitRetries() {
-    return configurationStore.getConfiguration(
-        realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES);
-  }
 }


Reply via email to