This is an automated email from the ASF dual-hosted git repository. singhpk234 pushed a commit to branch release/1.0.x in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/release/1.0.x by this push: new 9d1514b49 Feature: Rollback compaction on conflict (#1285) 9d1514b49 is described below commit 9d1514b4906a6a66c1ca2e45aa4e72ff848a044c Author: Prashant Singh <35593236+singhpk...@users.noreply.github.com> AuthorDate: Fri Jun 20 07:59:34 2025 -0700 Feature: Rollback compaction on conflict (#1285) Intention is make the catalog smarter, to revert the compaction commits in case of crunch to let the writers who are actually adding or removing the data to the table succeed. In a sense treating compaction as always a lower priority process. Presently the rest catalog client creates the snapshot and asks the Rest Server to apply the snapshot and gives this in a combination of requirement and update. Polaris could apply some basic inference and generate some updates to metadata given a property is enabled at a table level, by saying that It will revert back the commit which was created by compaction and let the write succeed. I had this PR in OSS, which was essentially doing this at the client end, but we think its best if we do this as server end. to support more such clients. How to use this Enable a catalog level configuration : polaris.config.rollback.compaction.on-conflicts.enabled when this is enabled polaris will apply the intelligence of rollbacking those REPLACE ops snapshot which have the property of polaris.internal.rollback.compaction.on-conflict in their snapshot summary to resolve conflicts at the server end ! a sample use case is there is a deployment of a Polaris where this config is enabled and there is auto compaction (maintenance job) which is updating the table state, it adds the snapshot summary that polaris.internal.rollback.compaction.on-conflict is true now when a backfill process running for 8 hours want to commit but can't because the compaction job committed before so in this case it will reach out to Polaris and Polaris will see if the snapshot of compation aka replace snapsho [...] Devlist: https://lists.apache.org/thread/8k8t77dgk1vc124fnb61932bdp9kf1lc --- .../polaris/core/config/FeatureConfiguration.java | 11 + .../quarkus/catalog/IcebergCatalogTest.java | 359 +++++++++++++++++++++ .../catalog/iceberg/CatalogHandlerUtils.java | 289 +++++++++++++++-- 3 files changed, 636 insertions(+), 23 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 1e0e96329..fe265c307 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -287,4 +287,15 @@ public class FeatureConfiguration<T> extends PolarisConfiguration<T> { + "This should only be set to 'true' for tests!") .defaultValue(false) .buildFeatureConfiguration(); + + public static final FeatureConfiguration<Boolean> ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS = + PolarisConfiguration.<Boolean>builder() + .key("ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS") + .catalogConfig("polaris.config.rollback.compaction.on-conflicts.enabled") + .description( + "Rollback replace snapshots created by compaction which have " + + "polaris.internal.conflict-resolution.by-operation-type.replace property set to rollback " + + "in their snapshot summary") + .defaultValue(false) + .buildFeatureConfiguration(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 2b4c4205c..becd3dcfe 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -19,6 +19,8 @@ package org.apache.polaris.service.quarkus.catalog; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doReturn; @@ -29,6 +31,8 @@ import static org.mockito.Mockito.when; import com.azure.core.exception.HttpResponseException; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import io.quarkus.test.junit.QuarkusMock; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; @@ -37,6 +41,7 @@ import jakarta.annotation.Nullable; import jakarta.inject.Inject; import jakarta.ws.rs.core.SecurityContext; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.reflect.Method; import java.time.Clock; import java.util.Arrays; @@ -48,16 +53,29 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.NotImplementedException; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; @@ -68,8 +86,11 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; @@ -111,6 +132,7 @@ import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO; @@ -135,7 +157,9 @@ import org.apache.polaris.service.task.TaskFileIOSupplier; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; import org.apache.polaris.service.types.TableUpdateNotification; +import org.assertj.core.api.AbstractCollectionAssert; import org.assertj.core.api.Assertions; +import org.assertj.core.api.ListAssert; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.assertj.core.configuration.PreferredAssumptionException; import org.junit.jupiter.api.AfterEach; @@ -164,6 +188,15 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { PreferredAssumptionException.JUNIT5); } + DeleteFile FILE_A_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + public static class Profile implements QuarkusTestProfile { @Override @@ -547,6 +580,332 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { .hasMessageContaining("Parent"); } + @Test + public void testConcurrentWritesWithRollbackNonEmptyTable() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List<MetadataUpdate> updates = updatedMetadata.changes(); + List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + // set the snapshot property in the summary to make this snapshot + // rollback-able. + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + try { + // Now call IRC server to commit delete operation. + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + catalogHandlerUtils.commit(((BaseTable) catalog.loadTable(TABLE)).operations(), request); + } catch (Exception e) { + fail("Rollback Compaction on conflict feature failed : " + e); + } + + table.refresh(); + + // Assert only 2 snapshots and no snapshot of REPLACE left. + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + // no snapshot in the hierarchy for REPLACE operations + assertThat(currentSnapshot.operation()).isNotEqualTo(DataOperations.REPLACE); + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + + // Inspect the files 1 DELETE file i.e. FILE_A_DELETES and 1 DATA FILE FILE_A + try { + try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) { + List<CharSequence> dataFilePaths = + Streams.stream(tasks) + .map(ContentScanTask::file) + .map(ContentFile::location) + .collect(Collectors.toList()); + List<CharSequence> deleteFilePaths = + Streams.stream(tasks) + .flatMap(t -> t.deletes().stream().map(ContentFile::location)) + .collect(Collectors.toList()); + ((ListAssert) + Assertions.assertThat(dataFilePaths) + .as("Should contain expected number of data files", new Object[0])) + .hasSize(1); + ((ListAssert) + Assertions.assertThat(deleteFilePaths) + .as("Should contain expected number of delete files", new Object[0])) + .hasSize(1); + ((AbstractCollectionAssert) + Assertions.assertThat(CharSequenceSet.of(dataFilePaths)) + .as("Should contain correct file paths", new Object[0])) + .isEqualTo( + CharSequenceSet.of( + Iterables.transform(Arrays.asList(FILE_A), ContentFile::location))); + ((AbstractCollectionAssert) + Assertions.assertThat(CharSequenceSet.of(deleteFilePaths)) + .as("Should contain correct file paths", new Object[0])) + .isEqualTo( + CharSequenceSet.of( + Iterables.transform(Arrays.asList(FILE_A_DELETES), ContentFile::location))); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + public void testConcurrentWritesWithRollbackWithNonReplaceSnapshotInBetween() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List<MetadataUpdate> updates = updatedMetadata.changes(); + List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + // commit the transaction. + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + // commit FILE_C + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(3); + this.assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C); + } + + @Test + public void + testConcurrentWritesWithRollbackEnableWithToRollbackSnapshotReferencedByOtherBranch() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List<MetadataUpdate> updates = updatedMetadata.changes(); + List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + Table t = catalog.loadTable(TABLE); + // add another branch B + t.manageSnapshots() + .createBranch("non-main") + .setCurrentSnapshot(t.currentSnapshot().snapshotId()) + .commit(); + // now add more files to non-main branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + this.assertFiles(catalog.loadTable(TABLE), FILE_B); + } + + @Test + public void testConcurrentWritesWithRollbackWithConcurrentWritesToDifferentBranches() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A to main branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + Table t = catalog.loadTable(TABLE); + // add another branch B + t.manageSnapshots() + .createBranch("non-main") + .setCurrentSnapshot(t.currentSnapshot().snapshotId()) + .commit(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List<MetadataUpdate> updates = updatedMetadata.changes(); + List<UpdateRequirement> requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B on main branch + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + // now add more files to non-main branch, this will make sequence number non monotonic for main + // branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + this.assertFiles(catalog.loadTable(TABLE), FILE_B); + } + @Test public void testValidateNotificationWhenTableAndNamespacesDontExist() { Assumptions.assumeTrue( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index aa99d53f5..ae879ea5f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -22,16 +22,21 @@ import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAUL import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import java.lang.reflect.Field; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,9 +44,13 @@ import java.util.stream.Collectors; import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -74,6 +83,8 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.SQLViewRepresentation; @@ -85,6 +96,8 @@ import org.apache.iceberg.view.ViewRepresentation; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.RealmContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains a collection of @@ -92,17 +105,41 @@ import org.apache.polaris.core.context.RealmContext; */ @ApplicationScoped public class CatalogHandlerUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogHandlerUtils.class); + private static final Schema EMPTY_SCHEMA = new Schema(); private static final String INITIAL_PAGE_TOKEN = ""; + private static final String CONFLICT_RESOLUTION_ACTION = + "polaris.internal.conflict-resolution.by-operation-type.replace"; + private static final Field LAST_SEQUENCE_NUMBER_FIELD; - private final RealmContext realmContext; - private final PolarisConfigurationStore configurationStore; + static { + try { + LAST_SEQUENCE_NUMBER_FIELD = + TableMetadata.Builder.class.getDeclaredField("lastSequenceNumber"); + LAST_SEQUENCE_NUMBER_FIELD.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Unable to access field", e); + } + } + + private final int maxCommitRetries; + private final boolean rollbackCompactionEnabled; @Inject public CatalogHandlerUtils( RealmContext realmContext, PolarisConfigurationStore configurationStore) { - this.realmContext = realmContext; - this.configurationStore = configurationStore; + this( + configurationStore.getConfiguration( + realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES), + configurationStore.getConfiguration( + realmContext, FeatureConfiguration.ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS)); + } + + @VisibleForTesting + public CatalogHandlerUtils(int maxCommitRetries, boolean rollbackCompactionEnabled) { + this.maxCommitRetries = maxCommitRetries; + this.rollbackCompactionEnabled = rollbackCompactionEnabled; } /** @@ -421,11 +458,12 @@ public class CatalogHandlerUtils { return ops.current(); } - protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + @VisibleForTesting + public TableMetadata commit(TableOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(maxCommitRetries()) + .retry(maxCommitRetries) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -435,30 +473,106 @@ public class CatalogHandlerUtils { .run( taskOps -> { TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); - isRetry.set(true); - // validate requirements + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + TableMetadata newBase = base; try { - request.requirements().forEach(requirement -> requirement.validate(base)); + request.requirements().forEach((requirement) -> requirement.validate(base)); + } catch (CommitFailedException e) { + if (!rollbackCompactionEnabled) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + LOGGER.debug( + "Attempting to Rollback replace operations for table={}, with current-snapshot-id={}", + base.uuid(), + base.currentSnapshot().snapshotId()); + UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId = + findAssertRefSnapshotID(request); + MetadataUpdate.SetSnapshotRef setSnapshotRef = findSetSnapshotRefUpdate(request); + + if (assertRefSnapshotId == null || setSnapshotRef == null) { + // This implies the request was not trying to add a snapshot. + LOGGER.debug( + "Giving up on Rollback replace operations for table={}, with current-snapshot-id={}, as operation doesn't attempts to add a single snapshot", + base.uuid(), + base.currentSnapshot().snapshotId()); + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // snapshot-id the client expects the table current_snapshot_id + long expectedCurrentSnapshotId = assertRefSnapshotId.snapshotId(); + + MetadataUpdate.AddSnapshot snapshotToBeAdded = findAddSnapshotUpdate(request); + if (snapshotToBeAdded == null) { + // Re-throw if, there's no snapshot data to be added. + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + LOGGER.info( + "Attempting to Rollback replace operation for table={}, with current-snapshot-id={}, to snapshot={}", + base.uuid(), + base.currentSnapshot().snapshotId(), + snapshotToBeAdded.snapshot().snapshotId()); + + List<MetadataUpdate> metadataUpdates = + generateUpdatesToRemoveNoopSnapshot( + base, expectedCurrentSnapshotId, setSnapshotRef.name()); + + if (metadataUpdates == null || metadataUpdates.isEmpty()) { + // Nothing can be done as this implies that there were not all + // No-op snapshots (REPLACE) between expectedCurrentSnapshotId and + // currentSnapshotId. hence re-throw the exception caught. + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + // Set back the ref we wanted to set, back to the snapshot-id + // the client is expecting the table to be at. + metadataBuilder.setBranchSnapshot( + expectedCurrentSnapshotId, setSnapshotRef.name()); + + // apply the remove snapshots update in the current metadata. + // NOTE: we need to setRef to expectedCurrentSnapshotId first and then apply + // remove, as otherwise the remove will drop the reference. + // NOTE: we can skip removing the now orphan base. It's not a hard requirement. + // just something good to do, and not leave for Remove Orphans. + // Ref rolled back update correctly to snapshot to be committed parent now. + metadataUpdates.forEach((update -> update.applyTo(metadataBuilder))); + newBase = + setAppropriateLastSeqNumber( + metadataBuilder, + base.uuid(), + base.lastSequenceNumber(), + base.snapshot(expectedCurrentSnapshotId).sequenceNumber()) + .build(); + LOGGER.info( + "Successfully roll-backed replace operation for table={}, with current-snapshot-id={}, to snapshot={}", + base.uuid(), + base.currentSnapshot().snapshotId(), + newBase.currentSnapshot().snapshotId()); + } + // double check if the requirements passes now. + try { + TableMetadata baseWithRemovedSnaps = newBase; + request + .requirements() + .forEach((requirement) -> requirement.validate(baseWithRemovedSnaps)); } catch (CommitFailedException e) { // wrap and rethrow outside of tasks to avoid unnecessary retry throw new ValidationFailureException(e); } - // apply changes - TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); - - TableMetadata updated = metadataBuilder.build(); + TableMetadata.Builder newMetadataBuilder = TableMetadata.buildFrom(newBase); + request.updates().forEach((update) -> update.applyTo(newMetadataBuilder)); + TableMetadata updated = newMetadataBuilder.build(); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed return; } - - // commit taskOps.commit(base, updated); }); - } catch (ValidationFailureException e) { throw e.wrapped(); } @@ -466,6 +580,140 @@ public class CatalogHandlerUtils { return ops.current(); } + private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID( + UpdateTableRequest request) { + UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null; + int total = 0; + for (UpdateRequirement requirement : request.requirements()) { + if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) { + ++total; + assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) requirement; + } + } + + // if > 1 assertion for refs, then it's not safe to roll back, make this Noop. + return total != 1 ? null : assertRefSnapshotID; + } + + private List<MetadataUpdate> generateUpdatesToRemoveNoopSnapshot( + TableMetadata base, long expectedCurrentSnapshotId, String updateRefName) { + // find the all the snapshots we want to retain which are not the part of current branch. + Set<Long> idsToRetain = Sets.newHashSet(); + for (Map.Entry<String, SnapshotRef> ref : base.refs().entrySet()) { + String refName = ref.getKey(); + SnapshotRef snapshotRef = ref.getValue(); + if (refName.equals(updateRefName)) { + continue; + } + idsToRetain.add(ref.getValue().snapshotId()); + // Always check the ancestry for both branch and tags + // mostly for case where a branch was created and then was dropped + // then a tag was created and then rollback happened post that tag + // was dropped and branch was re-created on it. + for (Snapshot ancestor : SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) { + idsToRetain.add(ancestor.snapshotId()); + } + } + + List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>(); + Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of the given branch + // ensure this branch has the latest sequence number. + long expectedSequenceNumber = base.lastSequenceNumber(); + // Unexpected state as table's current sequence number is not equal to the + // most recent snapshot the ref points to. + if (expectedSequenceNumber != base.snapshot(snapshotId).sequenceNumber()) { + LOGGER.debug( + "Giving up rolling back table {} to snapshot {}, ref current snapshot sequence number {} is not equal expected sequence number {}", + base.uuid(), + snapshotId, + base.snapshot(snapshotId).sequenceNumber(), + expectedSequenceNumber); + return null; + } + Set<Long> snapshotsToRemove = new LinkedHashSet<>(); + while (snapshotId != null && !Objects.equals(snapshotId, expectedCurrentSnapshotId)) { + Snapshot snap = base.snapshot(snapshotId); + if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) { + // Either encountered a non no-op snapshot or the snapshot is being referenced by any other + // reference either by branch or a tag. + LOGGER.debug( + "Giving up rolling back table {} to snapshot {}, snapshot to be removed referenced by another branch or tag ancestor", + base.uuid(), + snapshotId); + break; + } + snapshotsToRemove.add(snap.snapshotId()); + snapshotId = snap.parentId(); + } + + boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, expectedCurrentSnapshotId); + updateToRemoveSnapshot.add(new MetadataUpdate.RemoveSnapshots(snapshotsToRemove)); + return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null; + } + + private boolean isRollbackSnapshot(Snapshot snapshot) { + // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be rollback. + return DataOperations.REPLACE.equals(snapshot.operation()) + && PropertyUtil.propertyAsString(snapshot.summary(), CONFLICT_RESOLUTION_ACTION, "") + .equalsIgnoreCase("rollback"); + } + + private MetadataUpdate.SetSnapshotRef findSetSnapshotRefUpdate(UpdateTableRequest request) { + int total = 0; + MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null; + // find the SetRefName snapshot update + for (MetadataUpdate update : request.updates()) { + if (update instanceof MetadataUpdate.SetSnapshotRef) { + total++; + setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update; + } + } + + // if > 1 assertion for refs, then it's not safe to rollback, make this Noop. + return total != 1 ? null : setSnapshotRefUpdate; + } + + private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest request) { + int total = 0; + MetadataUpdate.AddSnapshot addSnapshot = null; + // find the SetRefName snapshot update + for (MetadataUpdate update : request.updates()) { + if (update instanceof MetadataUpdate.AddSnapshot) { + total++; + addSnapshot = (MetadataUpdate.AddSnapshot) update; + } + } + + // if > 1 assertion for addSnapshot, then it's not safe to rollback, make this Noop. + return total != 1 ? null : addSnapshot; + } + + private TableMetadata.Builder setAppropriateLastSeqNumber( + TableMetadata.Builder metadataBuilder, + String tableUUID, + long currentSequenceNumber, + long expectedSequenceNumber) { + // TODO: Get rid of the reflection call once TableMetadata have API for it. + // move the lastSequenceNumber back, to apply snapshot properly on the + // current-metadata Seq number are considered increasing monotonically + // snapshot over snapshot, the client generates the manifest list and hence + // the sequence number can't be changed for a snapshot the only possible option + // then is to change the sequenceNumber tracked by metadata.json + try { + // this should point to the sequence number that current tip of the + // branch belongs to, as the new commit will be applied on top of this. + LAST_SEQUENCE_NUMBER_FIELD.set(metadataBuilder, expectedSequenceNumber); + LOGGER.info( + "Setting table uuid:{} last sequence number from:{} to {}", + tableUUID, + currentSequenceNumber, + expectedSequenceNumber); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + return metadataBuilder; + } + private BaseView asBaseView(View view) { Preconditions.checkState( view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); @@ -565,7 +813,7 @@ public class CatalogHandlerUtils { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(maxCommitRetries()) + .retry(maxCommitRetries) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -606,9 +854,4 @@ public class CatalogHandlerUtils { return ops.current(); } - - private int maxCommitRetries() { - return configurationStore.getConfiguration( - realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); - } }