flyrain commented on code in PR #3360:
URL: https://github.com/apache/polaris/pull/3360#discussion_r2893505462
##########
integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java:
##########
@@ -1189,6 +1189,101 @@ public void
testMultipleConflictingCommitsToSingleTableInTransaction() {
assertThat(latestCommittedSchema.asStruct()).isEqualTo(originalSchema.asStruct());
}
+ @Test
+ public void testCoalescedConflictOnOneTableRollsBackEntireTransaction() {
+ Namespace namespace = Namespace.of("coalescingAtomicNs");
+ TableIdentifier goodId = TableIdentifier.of(namespace, "goodTable");
+ TableIdentifier conflictId = TableIdentifier.of(namespace,
"conflictTable");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ catalog().createTable(goodId, SCHEMA);
+ catalog().createTable(conflictId, SCHEMA);
+
+ Schema originalGoodSchema = catalog().loadTable(goodId).schema();
+ Schema originalConflictSchema = catalog().loadTable(conflictId).schema();
+
+ // goodTable: a single non-conflicting schema change
+ Transaction goodTx = catalog().loadTable(goodId).newTransaction();
+ goodTx.updateSchema().addColumn("new_col", Types.LongType.get()).commit();
+
+ // conflictTable: two independent transactions that both rename the same
column,
+ // producing two TableCommits for the same table. The first rename
succeeds,
+ // but the second conflicts because the schema has already changed.
+ Table conflictTable = catalog().loadTable(conflictId);
+ Transaction conflictTx1 = conflictTable.newTransaction();
+ Transaction conflictTx2 = conflictTable.newTransaction();
+ conflictTx1.updateSchema().renameColumn("data", "renamed-col1").commit();
+ conflictTx2.updateSchema().renameColumn("data", "renamed-col2").commit();
+
+ TableCommit goodCommit =
+ TableCommit.create(
+ goodId,
+ ((BaseTransaction) goodTx).startMetadata(),
+ ((BaseTransaction) goodTx).currentMetadata());
+ TableCommit conflictCommit1 =
+ TableCommit.create(
+ conflictId,
+ ((BaseTransaction) conflictTx1).startMetadata(),
+ ((BaseTransaction) conflictTx1).currentMetadata());
+ TableCommit conflictCommit2 =
+ TableCommit.create(
+ conflictId,
+ ((BaseTransaction) conflictTx2).startMetadata(),
+ ((BaseTransaction) conflictTx2).currentMetadata());
+
+ // The coalescing logic groups conflictCommit1 and conflictCommit2
together.
+ // The first rename succeeds, but the second fails requirement validation
+ // (schema ID changed). This should fail the entire transaction atomically.
+ assertThatThrownBy(
+ () -> restCatalog.commitTransaction(goodCommit, conflictCommit1,
conflictCommit2))
+ .isInstanceOf(CommitFailedException.class);
+
+ // Verify atomicity: neither table should have changed.
+ assertThat(catalog().loadTable(goodId).schema().asStruct())
+ .isEqualTo(originalGoodSchema.asStruct());
+ assertThat(catalog().loadTable(conflictId).schema().asStruct())
+ .isEqualTo(originalConflictSchema.asStruct());
+ }
+
+ @Test
+ public void
testMultipleNonConflictingUpdatesToSameTableWithSchemaAndProperties() {
+ Namespace namespace = Namespace.of("coalescingMixedNs");
+ TableIdentifier identifier = TableIdentifier.of(namespace,
"coalescingMixedTable");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ // Use a single Iceberg transaction that performs both a schema change and
a property update.
+ // This produces a single TableCommit containing multiple metadata
updates, verifying
+ // that the server correctly handles multiple update types within a single
commit request.
+ Table table = catalog().buildTable(identifier, SCHEMA).create();
+ Transaction transaction = table.newTransaction();
+
+ UpdateSchema updateSchema =
+ transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+ Schema expectedSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ transaction.updateProperties().set("prop-key", "prop-val").commit();
Review Comment:
Is behavior deterministic when we have two property updates like this?
1. transaction.updateProperties().set("prop-key", "prop-val1").commit();
2. transaction.updateProperties().set("prop-key", "prop-val2").commit();
If not, we may check with the Iceberg community to clarify the behavior. Not
a blocker.
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1033,60 +1034,82 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
new TransactionWorkspaceMetaStoreManager(diagnostics(),
metaStoreManager());
((IcebergCatalog)
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
+ // Group all changes by table identifier to handle them atomically.
+ // This prevents conflicts when multiple changes target the same table
entity.
+ // LinkedHashMap preserves insertion order for deterministic processing.
+ Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new
LinkedHashMap<>();
+ for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+ if (CatalogHandlerUtils.isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with
updateForStagedCreate: %s", change);
+ }
+ changesByTable.computeIfAbsent(change.identifier(), k -> new
ArrayList<>()).add(change);
+ }
+
+ // Process each table's changes in order.
+ // Note: All UpdateTableRequests for a given table are coalesced into a
single metadata
+ // update and a single tableOps.commit(), which results in one Polaris
entity update per
+ // table. This is subtly different from applying each UpdateTableRequest
as an independent
+ // commit (as if each were under a lock). Requirements are still validated
sequentially
+ // against the evolving metadata, so conflicts are detected correctly.
+ // See also the TODO in TransactionWorkspaceMetaStoreManager for a more
general (but more
+ // complex) alternative that would intercept at the MetaStoreManager layer.
List<TableMetadata> tableMetadataObjs = new ArrayList<>();
- commitTransactionRequest.tableChanges().stream()
- .forEach(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable baseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- if (CatalogHandlerUtils.isCreate(change)) {
- throw new BadRequestException(
- "Unsupported operation: commitTranaction with
updateForStagedCreate: %s",
- change);
+ changesByTable.forEach(
+ (tableIdentifier, changes) -> {
+ Table table = baseCatalog.loadTable(tableIdentifier);
+ if (!(table instanceof BaseTable baseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations tableOps = baseTable.operations();
+ TableMetadata baseMetadata = tableOps.current();
+
+ // Apply each change sequentially: validate requirements against
current state,
+ // then apply updates. This ensures conflicts are detected (e.g., if
two changes
+ // both expect schema ID 0, the second will fail after the first
increments it).
+ TableMetadata currentMetadata = baseMetadata;
+ for (UpdateTableRequest change : changes) {
+ // Validate requirements against the current metadata state
+ final TableMetadata metadataForValidation = currentMetadata;
+ change
+ .requirements()
+ .forEach(requirement ->
requirement.validate(metadataForValidation));
+
+ // TODO: Refactor to share/reconcile the update-application logic
below with
+ // CatalogHandlerUtils to avoid divergence as complexity grows.
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(currentMetadata);
+ for (MetadataUpdate singleUpdate : change.updates()) {
+ // Note: If location-overlap checking is refactored to be
atomic, we could
+ // support validation within a single multi-table transaction as
well, but
+ // will need to update the TransactionWorkspaceMetaStoreManager
to better
+ // expose the concept of being able to read uncommitted updates.
+ if (singleUpdate instanceof MetadataUpdate.SetLocation
setLocation) {
+ if (!currentMetadata.location().equals(setLocation.location())
+ && !realmConfig()
+
.getConfig(FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
Review Comment:
This check allows the operation `setLocation` when
ALLOW_NAMESPACE_LOCATION_OVERLAP is true. I think the behavior isn't correct.
cc @dennishuo @collado-mike
However, it isn't a blocker for me as this PR doesn't the logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]