dennishuo commented on code in PR #3360:
URL: https://github.com/apache/polaris/pull/3360#discussion_r2667107261


##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest)
         new TransactionWorkspaceMetaStoreManager(diagnostics, 
metaStoreManager);
     ((IcebergCatalog) 
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
 
-    commitTransactionRequest.tableChanges().stream()
-        .forEach(
-            change -> {
-              Table table = baseCatalog.loadTable(change.identifier());
-              if (!(table instanceof BaseTable baseTable)) {
-                throw new IllegalStateException(
-                    "Cannot wrap catalog that does not produce BaseTable");
-              }
-              if (isCreate(change)) {
-                throw new BadRequestException(
-                    "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s",
-                    change);
-              }
+    // Group all changes by table identifier to handle them atomically
+    // This prevents conflicts when multiple changes target the same table 
entity
+    // LinkedHashMap preserves insertion order for deterministic processing
+    Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new 
LinkedHashMap<>();
+    for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+      if (isCreate(change)) {
+        throw new BadRequestException(
+            "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s", change);
+      }
+      changesByTable.computeIfAbsent(change.identifier(), k -> new 
ArrayList<>()).add(change);
+    }
 
-              TableOperations tableOps = baseTable.operations();
-              TableMetadata currentMetadata = tableOps.current();
-
-              // Validate requirements; any CommitFailedExceptions will fail 
the overall request
-              change.requirements().forEach(requirement -> 
requirement.validate(currentMetadata));
-
-              // Apply changes
-              TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(currentMetadata);
-              change.updates().stream()
-                  .forEach(
-                      singleUpdate -> {
-                        // Note: If location-overlap checking is refactored to 
be atomic, we could
-                        // support validation within a single multi-table 
transaction as well, but
-                        // will need to update the 
TransactionWorkspaceMetaStoreManager to better
-                        // expose the concept of being able to read 
uncommitted updates.
-                        if (singleUpdate instanceof MetadataUpdate.SetLocation 
setLocation) {
-                          if 
(!currentMetadata.location().equals(setLocation.location())
-                              && !realmConfig.getConfig(
-                                  
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
-                            throw new BadRequestException(
-                                "Unsupported operation: commitTransaction 
containing SetLocation"
-                                    + " for table '%s' and new location '%s'",
-                                change.identifier(),
-                                ((MetadataUpdate.SetLocation) 
singleUpdate).location());
-                          }
-                        }
-
-                        // Apply updates to builder
-                        singleUpdate.applyTo(metadataBuilder);
-                      });
-
-              // Commit into transaction workspace we swapped the baseCatalog 
to use
-              TableMetadata updatedMetadata = metadataBuilder.build();
-              if (!updatedMetadata.changes().isEmpty()) {
-                tableOps.commit(currentMetadata, updatedMetadata);
+    // Process each table's changes in order
+    changesByTable.forEach(
+        (tableIdentifier, changes) -> {
+          Table table = baseCatalog.loadTable(tableIdentifier);
+          if (!(table instanceof BaseTable baseTable)) {
+            throw new IllegalStateException("Cannot wrap catalog that does not 
produce BaseTable");
+          }
+
+          TableOperations tableOps = baseTable.operations();
+          TableMetadata baseMetadata = tableOps.current();
+
+          // Apply each change sequentially: validate requirements against 
current state,
+          // then apply updates. This ensures conflicts are detected (e.g., if 
two changes
+          // both expect schema ID 0, the second will fail after the first 
increments it).
+          TableMetadata currentMetadata = baseMetadata;
+          for (UpdateTableRequest change : changes) {
+            // Validate requirements against the current metadata state
+            final TableMetadata metadataForValidation = currentMetadata;
+            change
+                .requirements()
+                .forEach(requirement -> 
requirement.validate(metadataForValidation));
+
+            // Apply this change's updates
+            TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(currentMetadata);
+            for (MetadataUpdate singleUpdate : change.updates()) {
+              // Note: If location-overlap checking is refactored to be 
atomic, we could
+              // support validation within a single multi-table transaction as 
well, but
+              // will need to update the TransactionWorkspaceMetaStoreManager 
to better
+              // expose the concept of being able to read uncommitted updates.
+              if (singleUpdate instanceof MetadataUpdate.SetLocation 
setLocation) {
+                if (!currentMetadata.location().equals(setLocation.location())
+                    && !realmConfig.getConfig(
+                        
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
+                  throw new BadRequestException(
+                      "Unsupported operation: commitTransaction containing 
SetLocation"
+                          + " for table '%s' and new location '%s'",
+                      change.identifier(), ((MetadataUpdate.SetLocation) 
singleUpdate).location());
+                }
               }
-            });
+
+              // Apply updates to builder

Review Comment:
   Maybe add a `// TODO` to refactor this to better share/reconcile with the 
[update-application logic in 
CatalogHandlerUtils](https://github.com/apache/polaris/blob/8abf19a06f809c8d1d0764b44e613c2614ce2c67/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java#L477)
 (I understand this divergence was already latent and not introduced by this 
PR, but as the update-application logic grows in complexity here it's going to 
start getting a lot worse).



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest)
         new TransactionWorkspaceMetaStoreManager(diagnostics, 
metaStoreManager);
     ((IcebergCatalog) 
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
 
-    commitTransactionRequest.tableChanges().stream()
-        .forEach(
-            change -> {
-              Table table = baseCatalog.loadTable(change.identifier());
-              if (!(table instanceof BaseTable baseTable)) {
-                throw new IllegalStateException(
-                    "Cannot wrap catalog that does not produce BaseTable");
-              }
-              if (isCreate(change)) {
-                throw new BadRequestException(
-                    "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s",
-                    change);
-              }
+    // Group all changes by table identifier to handle them atomically
+    // This prevents conflicts when multiple changes target the same table 
entity
+    // LinkedHashMap preserves insertion order for deterministic processing
+    Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new 
LinkedHashMap<>();
+    for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+      if (isCreate(change)) {
+        throw new BadRequestException(
+            "Unsupported operation: commitTranaction with 
updateForStagedCreate: %s", change);
+      }
+      changesByTable.computeIfAbsent(change.identifier(), k -> new 
ArrayList<>()).add(change);
+    }
 
-              TableOperations tableOps = baseTable.operations();
-              TableMetadata currentMetadata = tableOps.current();
-
-              // Validate requirements; any CommitFailedExceptions will fail 
the overall request
-              change.requirements().forEach(requirement -> 
requirement.validate(currentMetadata));
-
-              // Apply changes
-              TableMetadata.Builder metadataBuilder = 
TableMetadata.buildFrom(currentMetadata);
-              change.updates().stream()
-                  .forEach(
-                      singleUpdate -> {
-                        // Note: If location-overlap checking is refactored to 
be atomic, we could
-                        // support validation within a single multi-table 
transaction as well, but
-                        // will need to update the 
TransactionWorkspaceMetaStoreManager to better
-                        // expose the concept of being able to read 
uncommitted updates.
-                        if (singleUpdate instanceof MetadataUpdate.SetLocation 
setLocation) {
-                          if 
(!currentMetadata.location().equals(setLocation.location())
-                              && !realmConfig.getConfig(
-                                  
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
-                            throw new BadRequestException(
-                                "Unsupported operation: commitTransaction 
containing SetLocation"
-                                    + " for table '%s' and new location '%s'",
-                                change.identifier(),
-                                ((MetadataUpdate.SetLocation) 
singleUpdate).location());
-                          }
-                        }
-
-                        // Apply updates to builder
-                        singleUpdate.applyTo(metadataBuilder);
-                      });
-
-              // Commit into transaction workspace we swapped the baseCatalog 
to use
-              TableMetadata updatedMetadata = metadataBuilder.build();
-              if (!updatedMetadata.changes().isEmpty()) {
-                tableOps.commit(currentMetadata, updatedMetadata);
+    // Process each table's changes in order
+    changesByTable.forEach(

Review Comment:
   Yeah, logic also looks correct to me. +1 to adding a comment on the subtlety 
though that we're coalescing all the updates for a given table into a single 
Polaris entity update, which is a slightly different behavior than if the 
caller expected the various `UpdateTableRequests` in this `commitTransaction` 
to really behave as if they were each applied independently (but as if under a 
lock).
   
   Note that this issue was a known limitation, and referenced in a TODO in 
`TransactionWorkspaceMetaStoreManager`:
   
   
https://github.com/apache/polaris/blob/8abf19a06f809c8d1d0764b44e613c2614ce2c67/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java#L84
   
       // TODO: If we want to support the semantic of opening a transaction in 
which multiple
       // reads and writes occur on the same entities, where the reads are 
expected to see the writes
       // within the transaction workspace that haven't actually been 
committed, we can augment this
       // class by allowing these pendingUpdates to represent the latest state 
of the entity if we
       // also increment entityVersion. We'd need to store both a "latest view" 
of all updated entities
       // to serve reads within the same transaction while also storing the 
ordered list of
       // pendingUpdates that ultimately need to be applied in order within the 
real MetaStoreManager.
   
   The alternative "fix" described there that is  more general but more complex 
and probably has pitfalls is to really queue up the sequential mutations per 
entity in that "uncommitted persistence layer".
   
   The main implications would be that if we plug into the MetaStoreManager 
layer, we can intercept but inherit other relevant hooks, such as generating 
events, having entityVersion increments directly match actual update requests, 
etc.
   
   But I'm in favor of this more targeted change-coalescing fix here for now. 
We could either update/remove the TODO in 
`TransactionWorkspaceMetaStoreManager` and/or leave a comment in the code here 
referencing the other approach so any future changes to the way we handle these 
can more easily sort through it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to