singhpk234 commented on code in PR #1285: URL: https://github.com/apache/polaris/pull/1285#discussion_r2061942672
########## service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java: ########## @@ -762,7 +777,190 @@ public LoadTableResponse updateTable( if (isStaticFacade(catalog)) { throw new BadRequestException("Cannot update table on static-facade external catalogs."); } - return CatalogHandlers.updateTable(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + // TODO: pending discussion if table property is right way, or a writer specific knob is + // required. + return updateTableWithRollback(baseCatalog, tableIdentifier, applyUpdateFilters(request)); + } + + // TODO: Clean this up when CatalogHandler become extensible. + // Copy of CatalogHandler#update + private static LoadTableResponse updateTableWithRollback( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]); + TableMetadata finalMetadata; + if (isCreate(request)) { + Transaction transaction = + catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction(); + if (!(transaction instanceof BaseTransaction)) { + throw new IllegalStateException( + "Cannot wrap catalog that does not produce BaseTransaction"); + } + + BaseTransaction baseTransaction = (BaseTransaction) transaction; + finalMetadata = create(baseTransaction.underlyingOps(), request); + } else { + Table table = catalog.loadTable(ident); + if (!(table instanceof BaseTable)) { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + + TableOperations ops = ((BaseTable) table).operations(); + finalMetadata = commit(ops, request); + } + + return LoadTableResponse.builder().withTableMetadata(finalMetadata).build(); + } + + // TODO: Clean this up when CatalogHandler become extensible. + // Copy of CatalogHandler#create + private static TableMetadata create(TableOperations ops, UpdateTableRequest request) { + request.requirements().forEach((requirement) -> requirement.validate(ops.current())); + Optional<Integer> formatVersion = + request.updates().stream() + .filter((update) -> update instanceof MetadataUpdate.UpgradeFormatVersion) + .map((update) -> ((MetadataUpdate.UpgradeFormatVersion) update).formatVersion()) + .findFirst(); + TableMetadata.Builder builder = + (TableMetadata.Builder) + formatVersion + .map(TableMetadata::buildFromEmpty) + .orElseGet(TableMetadata::buildFromEmpty); + request.updates().forEach((update) -> update.applyTo(builder)); + ops.commit((TableMetadata) null, builder.build()); + return ops.current(); + } + + @VisibleForTesting + // TODO: Clean this up when CatalogHandler become extensible. + // Copy of CatalogHandler#commit + public static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + AtomicBoolean isRetry = new AtomicBoolean(false); + + try { + Tasks.foreach(new TableOperations[] {ops}) + .retry(4) + .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F) + .onlyRetryOn(CommitFailedException.class) + .run( + (taskOps) -> { + TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); + isRetry.set(true); + // Prev PR: https://github.com/apache/iceberg/pull/5888 + boolean rollbackCompaction = + PropertyUtil.propertyAsBoolean( + taskOps.current().properties(), ROLLBACK_REPLACE_ENABLED_PROPERTY, false); + + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + TableMetadata newBase = base; + try { + request.requirements().forEach((requirement) -> requirement.validate(base)); + } catch (CommitFailedException e) { + if (!rollbackCompaction) { + throw new ValidationFailureException(e); + } + // Since snapshot has already been created at the client end. + // Nothing much can be done, we can move this + // to writer specific thing, but it would be cool if catalog does this for us. + // Inspect that the requirements states that snapshot + // ref needs to be asserted this usually means in the update section + // it has addSnapshot and setSnapshotRef + UpdateRequirement.AssertRefSnapshotID addSnapshot = null; + int found = 0; + for (UpdateRequirement requirement : request.requirements()) { + // there should be only add snapshot request + if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) { + ++found; + addSnapshot = (UpdateRequirement.AssertRefSnapshotID) requirement; + } + } + + if (found != 1) { + // TODO: handle this case, find min snapshot id, to rollback to give it creates + // lineage + // lets not complicate things rn + throw new ValidationFailureException(e); + } + + Long parentSnapshotId = addSnapshot.snapshotId(); + // so we will first check all the snapshots on the top of + // base on which the snapshot we want to commit is of type REPLACE ops. + Long parentToRollbackTo = base.currentSnapshot().snapshotId(); + List<MetadataUpdate> updateToRemoveSnapshot = new ArrayList<>(); + while (!Objects.equals(parentToRollbackTo, parentSnapshotId)) { + Snapshot snap = ops.current().snapshot(parentToRollbackTo); + if (!DataOperations.REPLACE.equals(snap.operation())) { + break; + } + updateToRemoveSnapshot.add( + new MetadataUpdate.RemoveSnapshot(snap.snapshotId())); + parentToRollbackTo = snap.parentId(); + } + + MetadataUpdate.SetSnapshotRef ref = null; + found = 0; + // find the SetRefName snapshot update + for (MetadataUpdate update : request.updates()) { + if (update instanceof MetadataUpdate.SetSnapshotRef) { + ++found; + ref = (MetadataUpdate.SetSnapshotRef) update; + } + } + + if (found != 1 || !Objects.equals(parentToRollbackTo, parentSnapshotId)) { Review Comment: Ack, extracted the cross branch ref, validation and also added an UT for this scenario, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org