eric-maynard commented on code in PR #1285:
URL: https://github.com/apache/polaris/pull/1285#discussion_r2050936747
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
Review Comment:
Can we make these configurable, or else move them into constants? Maybe
using Duration?
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
Review Comment:
I know it's private, but can you add a doc here? I found the logic a bit
hard to follow at first
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ (taskOps) -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
+ isRetry.set(true);
+ // Prev PR: https://github.com/apache/iceberg/pull/5888
+ boolean rollbackCompaction =
+ PropertyUtil.propertyAsBoolean(
+ taskOps.current().properties(),
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
+ try {
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompaction) {
+ throw new ValidationFailureException(e);
+ }
+ // Since snapshot has already been created at the client end.
+ // Nothing much can be done, we can move this
+ // to writer specific thing, but it would be cool if catalog
does this for us.
+ // Inspect that the requirements states that snapshot
+ // ref needs to be asserted this usually means in the update
section
+ // it has addSnapshot and setSnapshotRef
+ UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+ int found = 0;
+ for (UpdateRequirement requirement : request.requirements())
{
+ // there should be only add snapshot request
+ if (requirement instanceof
UpdateRequirement.AssertRefSnapshotID) {
+ ++found;
+ addSnapshot = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ if (found != 1) {
+ // TODO: handle this case, find min snapshot id, to
rollback to give it creates
+ // lineage
+ // lets not complicate things rn
+ throw new ValidationFailureException(e);
+ }
+
+ Long parentSnapshotId = addSnapshot.snapshotId();
+ // so we will first check all the snapshots on the top of
+ // base on which the snapshot we want to commit is of type
REPLACE ops.
+ Long parentToRollbackTo =
base.currentSnapshot().snapshotId();
+ List<MetadataUpdate> updateToRemoveSnapshot = new
ArrayList<>();
+ while (!Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ Snapshot snap = ops.current().snapshot(parentToRollbackTo);
+ if (!DataOperations.REPLACE.equals(snap.operation())) {
+ break;
+ }
+ updateToRemoveSnapshot.add(
+ new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
+ parentToRollbackTo = snap.parentId();
+ }
+
+ MetadataUpdate.SetSnapshotRef ref = null;
+ found = 0;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ ++found;
+ ref = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ if (found != 1 || (!Objects.equals(parentToRollbackTo,
parentSnapshotId))) {
+ // nothing can be done as this implies there was a non
replace
+ // snapshot in between or there is more than setRef ops,
we don't know where
+ // to go.
+ throw new ValidationFailureException(e);
+ }
+
+ // first we should also set back the ref we wanted to set,
back to the base
+ // on which the current update is based on.
+ metadataBuilder.setBranchSnapshot(parentSnapshotId,
ref.name());
+
+ // apply the remove snapshots update in the current metadata.
+ // NOTE: we need to setRef to parent first and then apply
remove as the remove
+ // will drop. The tags / branch which don't have reference.
+ // NOTE: we can skip removing the now orphan base. Its not a
hard requirement.
+ // just something good to do, and not leave for Remove
Orphans.
+ updateToRemoveSnapshot.forEach((update ->
update.applyTo(metadataBuilder)));
+ // Ref rolled back update correctly to snapshot to be
committed parent now.
+ newBase = metadataBuilder.build();
+ // move the lastSequenceNumber back, to apply snapshot
properly.
+ // Seq number are considered increasing monotonically,
snapshot over snapshot, so
+ // this is important.
+ Class<?> clazz = newBase.getClass();
+ try {
+ Field field = clazz.getDeclaredField("lastSequenceNumber");
+ field.setAccessible(true);
+ // this should point to the sequence number that current
tip of the
+ // branch belongs to, as new commit is gon get applied on
top of this.
Review Comment:
```suggestion
// branch belongs to, as the new commit will be applied
on top of this.
```
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
Review Comment:
For this method, too, I'm a bit conflicted about whether this logic should
really live in `IcebergCatalogHandler`. It seems a bit low-level for the
class... could we push it into `IcebergCatalog`?
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
Review Comment:
ditto, can we make this configurable?
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
Review Comment:
Same here, can we just use `baseCatalog` instead of making this static?
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
Review Comment:
Can we just use `baseCatalog` instead of making this static?
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ (taskOps) -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
+ isRetry.set(true);
+ // Prev PR: https://github.com/apache/iceberg/pull/5888
+ boolean rollbackCompaction =
+ PropertyUtil.propertyAsBoolean(
+ taskOps.current().properties(),
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
+ try {
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompaction) {
+ throw new ValidationFailureException(e);
+ }
+ // Since snapshot has already been created at the client end.
+ // Nothing much can be done, we can move this
+ // to writer specific thing, but it would be cool if catalog
does this for us.
+ // Inspect that the requirements states that snapshot
+ // ref needs to be asserted this usually means in the update
section
+ // it has addSnapshot and setSnapshotRef
+ UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+ int found = 0;
+ for (UpdateRequirement requirement : request.requirements())
{
+ // there should be only add snapshot request
+ if (requirement instanceof
UpdateRequirement.AssertRefSnapshotID) {
+ ++found;
+ addSnapshot = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ if (found != 1) {
+ // TODO: handle this case, find min snapshot id, to
rollback to give it creates
+ // lineage
+ // lets not complicate things rn
+ throw new ValidationFailureException(e);
+ }
+
+ Long parentSnapshotId = addSnapshot.snapshotId();
+ // so we will first check all the snapshots on the top of
+ // base on which the snapshot we want to commit is of type
REPLACE ops.
+ Long parentToRollbackTo =
base.currentSnapshot().snapshotId();
+ List<MetadataUpdate> updateToRemoveSnapshot = new
ArrayList<>();
+ while (!Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ Snapshot snap = ops.current().snapshot(parentToRollbackTo);
+ if (!DataOperations.REPLACE.equals(snap.operation())) {
+ break;
+ }
+ updateToRemoveSnapshot.add(
+ new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
+ parentToRollbackTo = snap.parentId();
+ }
+
+ MetadataUpdate.SetSnapshotRef ref = null;
+ found = 0;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ ++found;
+ ref = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ if (found != 1 || (!Objects.equals(parentToRollbackTo,
parentSnapshotId))) {
+ // nothing can be done as this implies there was a non
replace
+ // snapshot in between or there is more than setRef ops,
we don't know where
+ // to go.
+ throw new ValidationFailureException(e);
+ }
+
+ // first we should also set back the ref we wanted to set,
back to the base
+ // on which the current update is based on.
+ metadataBuilder.setBranchSnapshot(parentSnapshotId,
ref.name());
+
+ // apply the remove snapshots update in the current metadata.
+ // NOTE: we need to setRef to parent first and then apply
remove as the remove
+ // will drop. The tags / branch which don't have reference.
+ // NOTE: we can skip removing the now orphan base. Its not a
hard requirement.
+ // just something good to do, and not leave for Remove
Orphans.
+ updateToRemoveSnapshot.forEach((update ->
update.applyTo(metadataBuilder)));
+ // Ref rolled back update correctly to snapshot to be
committed parent now.
+ newBase = metadataBuilder.build();
+ // move the lastSequenceNumber back, to apply snapshot
properly.
+ // Seq number are considered increasing monotonically,
snapshot over snapshot, so
+ // this is important.
+ Class<?> clazz = newBase.getClass();
+ try {
+ Field field = clazz.getDeclaredField("lastSequenceNumber");
+ field.setAccessible(true);
+ // this should point to the sequence number that current
tip of the
+ // branch belongs to, as new commit is gon get applied on
top of this.
+ field.set(newBase,
newBase.currentSnapshot().sequenceNumber());
+ } catch (NoSuchFieldException | IllegalAccessException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ // double check if the requirements passes now.
+ try {
+ TableMetadata baseWithRemovedSnaps = newBase;
+ request
+ .requirements()
+ .forEach((requirement) ->
requirement.validate(baseWithRemovedSnaps));
+ } catch (CommitFailedException e) {
+ throw new ValidationFailureException(e);
+ }
+
+ TableMetadata.Builder newMetadataBuilder =
TableMetadata.buildFrom(newBase);
+ request.updates().forEach((update) ->
update.applyTo(newMetadataBuilder));
+ TableMetadata updated = newMetadataBuilder.build();
+ // always commit this
+ taskOps.commit(base, updated);
+ });
+ } catch (ValidationFailureException e) {
+ throw e.wrapped();
+ }
+
+ return ops.current();
+ }
+
+ private static class ValidationFailureException extends RuntimeException {
Review Comment:
Let's move this into it's own file
##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -754,7 +769,199 @@ public LoadTableResponse updateTable(
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade
external catalogs.");
}
- return CatalogHandlers.updateTable(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ // TODO: pending discussion if table property is right way, or a writer
specific knob is
+ // required.
+ return updateTableWithRollback(baseCatalog, tableIdentifier,
applyUpdateFilters(request));
+ }
+
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest
request) {
+ request.requirements().forEach((requirement) ->
requirement.validate(ops.current()));
+ Optional<Integer> formatVersion =
+ request.updates().stream()
+ .filter((update) -> update instanceof
MetadataUpdate.UpgradeFormatVersion)
+ .map((update) -> ((MetadataUpdate.UpgradeFormatVersion)
update).formatVersion())
+ .findFirst();
+ TableMetadata.Builder builder =
+ (TableMetadata.Builder)
+ formatVersion
+ .map(TableMetadata::buildFromEmpty)
+ .orElseGet(TableMetadata::buildFromEmpty);
+ request.updates().forEach((update) -> update.applyTo(builder));
+ ops.commit((TableMetadata) null, builder.build());
+ return ops.current();
+ }
+
+ // TODO: Clean this up when CatalogHandler become extensible.
+ // Copy of CatalogHandler#update
+ private static LoadTableResponse updateTableWithRollback(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ Schema EMPTY_SCHEMA = new Schema(new Types.NestedField[0]);
+ TableMetadata finalMetadata;
+ if (isCreate(request)) {
+ Transaction transaction =
+ catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
+ if (!(transaction instanceof BaseTransaction)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTransaction");
+ }
+
+ BaseTransaction baseTransaction = (BaseTransaction) transaction;
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
+ } else {
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations ops = ((BaseTable) table).operations();
+ finalMetadata = commit(ops, request);
+ }
+
+ return
LoadTableResponse.builder().withTableMetadata(finalMetadata).build();
+ }
+
+ @VisibleForTesting
+ public static TableMetadata commit(TableOperations ops, UpdateTableRequest
request) {
+ AtomicBoolean isRetry = new AtomicBoolean(false);
+
+ try {
+ Tasks.foreach(new TableOperations[] {ops})
+ .retry(4)
+ .exponentialBackoff(100L, 60000L, 1800000L, (double) 2.0F)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ (taskOps) -> {
+ TableMetadata base = isRetry.get() ? taskOps.refresh() :
taskOps.current();
+ isRetry.set(true);
+ // Prev PR: https://github.com/apache/iceberg/pull/5888
+ boolean rollbackCompaction =
+ PropertyUtil.propertyAsBoolean(
+ taskOps.current().properties(),
ROLLBACK_REPLACE_ENABLED_PROPERTY, false);
+
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(base);
+ TableMetadata newBase = base;
+ try {
+ request.requirements().forEach((requirement) ->
requirement.validate(base));
+ } catch (CommitFailedException e) {
+ if (!rollbackCompaction) {
+ throw new ValidationFailureException(e);
+ }
+ // Since snapshot has already been created at the client end.
+ // Nothing much can be done, we can move this
+ // to writer specific thing, but it would be cool if catalog
does this for us.
+ // Inspect that the requirements states that snapshot
+ // ref needs to be asserted this usually means in the update
section
+ // it has addSnapshot and setSnapshotRef
+ UpdateRequirement.AssertRefSnapshotID addSnapshot = null;
+ int found = 0;
+ for (UpdateRequirement requirement : request.requirements())
{
+ // there should be only add snapshot request
+ if (requirement instanceof
UpdateRequirement.AssertRefSnapshotID) {
+ ++found;
+ addSnapshot = (UpdateRequirement.AssertRefSnapshotID)
requirement;
+ }
+ }
+
+ if (found != 1) {
+ // TODO: handle this case, find min snapshot id, to
rollback to give it creates
+ // lineage
+ // lets not complicate things rn
+ throw new ValidationFailureException(e);
+ }
+
+ Long parentSnapshotId = addSnapshot.snapshotId();
+ // so we will first check all the snapshots on the top of
+ // base on which the snapshot we want to commit is of type
REPLACE ops.
+ Long parentToRollbackTo =
base.currentSnapshot().snapshotId();
+ List<MetadataUpdate> updateToRemoveSnapshot = new
ArrayList<>();
+ while (!Objects.equals(parentToRollbackTo,
parentSnapshotId)) {
+ Snapshot snap = ops.current().snapshot(parentToRollbackTo);
+ if (!DataOperations.REPLACE.equals(snap.operation())) {
+ break;
+ }
+ updateToRemoveSnapshot.add(
+ new MetadataUpdate.RemoveSnapshot(snap.snapshotId()));
+ parentToRollbackTo = snap.parentId();
+ }
+
+ MetadataUpdate.SetSnapshotRef ref = null;
+ found = 0;
+ // find the SetRefName snapshot update
+ for (MetadataUpdate update : request.updates()) {
+ if (update instanceof MetadataUpdate.SetSnapshotRef) {
+ ++found;
+ ref = (MetadataUpdate.SetSnapshotRef) update;
+ }
+ }
+
+ if (found != 1 || (!Objects.equals(parentToRollbackTo,
parentSnapshotId))) {
Review Comment:
nit: extra (parens)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]