dennishuo commented on code in PR #3360:
URL: https://github.com/apache/polaris/pull/3360#discussion_r2667107261
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
new TransactionWorkspaceMetaStoreManager(diagnostics,
metaStoreManager);
((IcebergCatalog)
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
- commitTransactionRequest.tableChanges().stream()
- .forEach(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable baseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- if (isCreate(change)) {
- throw new BadRequestException(
- "Unsupported operation: commitTranaction with
updateForStagedCreate: %s",
- change);
- }
+ // Group all changes by table identifier to handle them atomically
+ // This prevents conflicts when multiple changes target the same table
entity
+ // LinkedHashMap preserves insertion order for deterministic processing
+ Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new
LinkedHashMap<>();
+ for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+ if (isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with
updateForStagedCreate: %s", change);
+ }
+ changesByTable.computeIfAbsent(change.identifier(), k -> new
ArrayList<>()).add(change);
+ }
- TableOperations tableOps = baseTable.operations();
- TableMetadata currentMetadata = tableOps.current();
-
- // Validate requirements; any CommitFailedExceptions will fail
the overall request
- change.requirements().forEach(requirement ->
requirement.validate(currentMetadata));
-
- // Apply changes
- TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(currentMetadata);
- change.updates().stream()
- .forEach(
- singleUpdate -> {
- // Note: If location-overlap checking is refactored to
be atomic, we could
- // support validation within a single multi-table
transaction as well, but
- // will need to update the
TransactionWorkspaceMetaStoreManager to better
- // expose the concept of being able to read
uncommitted updates.
- if (singleUpdate instanceof MetadataUpdate.SetLocation
setLocation) {
- if
(!currentMetadata.location().equals(setLocation.location())
- && !realmConfig.getConfig(
-
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
- throw new BadRequestException(
- "Unsupported operation: commitTransaction
containing SetLocation"
- + " for table '%s' and new location '%s'",
- change.identifier(),
- ((MetadataUpdate.SetLocation)
singleUpdate).location());
- }
- }
-
- // Apply updates to builder
- singleUpdate.applyTo(metadataBuilder);
- });
-
- // Commit into transaction workspace we swapped the baseCatalog
to use
- TableMetadata updatedMetadata = metadataBuilder.build();
- if (!updatedMetadata.changes().isEmpty()) {
- tableOps.commit(currentMetadata, updatedMetadata);
+ // Process each table's changes in order
+ changesByTable.forEach(
+ (tableIdentifier, changes) -> {
+ Table table = baseCatalog.loadTable(tableIdentifier);
+ if (!(table instanceof BaseTable baseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not
produce BaseTable");
+ }
+
+ TableOperations tableOps = baseTable.operations();
+ TableMetadata baseMetadata = tableOps.current();
+
+ // Apply each change sequentially: validate requirements against
current state,
+ // then apply updates. This ensures conflicts are detected (e.g., if
two changes
+ // both expect schema ID 0, the second will fail after the first
increments it).
+ TableMetadata currentMetadata = baseMetadata;
+ for (UpdateTableRequest change : changes) {
+ // Validate requirements against the current metadata state
+ final TableMetadata metadataForValidation = currentMetadata;
+ change
+ .requirements()
+ .forEach(requirement ->
requirement.validate(metadataForValidation));
+
+ // Apply this change's updates
+ TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(currentMetadata);
+ for (MetadataUpdate singleUpdate : change.updates()) {
+ // Note: If location-overlap checking is refactored to be
atomic, we could
+ // support validation within a single multi-table transaction as
well, but
+ // will need to update the TransactionWorkspaceMetaStoreManager
to better
+ // expose the concept of being able to read uncommitted updates.
+ if (singleUpdate instanceof MetadataUpdate.SetLocation
setLocation) {
+ if (!currentMetadata.location().equals(setLocation.location())
+ && !realmConfig.getConfig(
+
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTransaction containing
SetLocation"
+ + " for table '%s' and new location '%s'",
+ change.identifier(), ((MetadataUpdate.SetLocation)
singleUpdate).location());
+ }
}
- });
+
+ // Apply updates to builder
Review Comment:
Maybe add a `// TODO` to refactor this to better share/reconcile with the
[update-application logic in
CatalogHandlerUtils](https://github.com/apache/polaris/blob/8abf19a06f809c8d1d0764b44e613c2614ce2c67/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java#L477)
(I understand this divergence was already latent and not introduced by this
PR, but as the update-application logic grows in complexity here it's going to
start getting a lot worse).
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
new TransactionWorkspaceMetaStoreManager(diagnostics,
metaStoreManager);
((IcebergCatalog)
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
- commitTransactionRequest.tableChanges().stream()
- .forEach(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable baseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- if (isCreate(change)) {
- throw new BadRequestException(
- "Unsupported operation: commitTranaction with
updateForStagedCreate: %s",
- change);
- }
+ // Group all changes by table identifier to handle them atomically
+ // This prevents conflicts when multiple changes target the same table
entity
+ // LinkedHashMap preserves insertion order for deterministic processing
+ Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new
LinkedHashMap<>();
+ for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+ if (isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with
updateForStagedCreate: %s", change);
+ }
+ changesByTable.computeIfAbsent(change.identifier(), k -> new
ArrayList<>()).add(change);
+ }
- TableOperations tableOps = baseTable.operations();
- TableMetadata currentMetadata = tableOps.current();
-
- // Validate requirements; any CommitFailedExceptions will fail
the overall request
- change.requirements().forEach(requirement ->
requirement.validate(currentMetadata));
-
- // Apply changes
- TableMetadata.Builder metadataBuilder =
TableMetadata.buildFrom(currentMetadata);
- change.updates().stream()
- .forEach(
- singleUpdate -> {
- // Note: If location-overlap checking is refactored to
be atomic, we could
- // support validation within a single multi-table
transaction as well, but
- // will need to update the
TransactionWorkspaceMetaStoreManager to better
- // expose the concept of being able to read
uncommitted updates.
- if (singleUpdate instanceof MetadataUpdate.SetLocation
setLocation) {
- if
(!currentMetadata.location().equals(setLocation.location())
- && !realmConfig.getConfig(
-
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
- throw new BadRequestException(
- "Unsupported operation: commitTransaction
containing SetLocation"
- + " for table '%s' and new location '%s'",
- change.identifier(),
- ((MetadataUpdate.SetLocation)
singleUpdate).location());
- }
- }
-
- // Apply updates to builder
- singleUpdate.applyTo(metadataBuilder);
- });
-
- // Commit into transaction workspace we swapped the baseCatalog
to use
- TableMetadata updatedMetadata = metadataBuilder.build();
- if (!updatedMetadata.changes().isEmpty()) {
- tableOps.commit(currentMetadata, updatedMetadata);
+ // Process each table's changes in order
+ changesByTable.forEach(
Review Comment:
Yeah, logic also looks correct to me. +1 to adding a comment on the subtlety
though that we're coalescing all the updates for a given table into a single
Polaris entity update, which is a slightly different behavior than if the
caller expected the various `UpdateTableRequests` in this `commitTransaction`
to really behave as if they were each applied independently (but as if under a
lock).
Note that this issue was a known limitation, and referenced in a TODO in
`TransactionWorkspaceMetaStoreManager`:
https://github.com/apache/polaris/blob/8abf19a06f809c8d1d0764b44e613c2614ce2c67/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java#L84
// TODO: If we want to support the semantic of opening a transaction in
which multiple
// reads and writes occur on the same entities, where the reads are
expected to see the writes
// within the transaction workspace that haven't actually been
committed, we can augment this
// class by allowing these pendingUpdates to represent the latest state
of the entity if we
// also increment entityVersion. We'd need to store both a "latest view"
of all updated entities
// to serve reads within the same transaction while also storing the
ordered list of
// pendingUpdates that ultimately need to be applied in order within the
real MetaStoreManager.
The alternative "fix" described there that is more general but more complex
and probably has pitfalls is to really queue up the sequential mutations per
entity in that "uncommitted persistence layer".
The main implications would be that if we plug into the MetaStoreManager
layer, we can intercept but inherit other relevant hooks, such as generating
events, having entityVersion increments directly match actual update requests,
etc.
But I'm in favor of this more targeted change-coalescing fix here for now.
We could either update/remove the TODO in
`TransactionWorkspaceMetaStoreManager` and/or leave a comment in the code here
referencing the other approach so any future changes to the way we handle these
can more easily sort through it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]