dimas-b commented on code in PR #3360:
URL: https://github.com/apache/polaris/pull/3360#discussion_r2666006818
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
new TransactionWorkspaceMetaStoreManager(diagnostics,
metaStoreManager);
((IcebergCatalog)
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
- commitTransactionRequest.tableChanges().stream()
- .forEach(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable baseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- if (isCreate(change)) {
- throw new BadRequestException(
- "Unsupported operation: commitTranaction with
updateForStagedCreate: %s",
- change);
- }
+ // Group all changes by table identifier to handle them atomically
+ // This prevents conflicts when multiple changes target the same table
entity
+ // LinkedHashMap preserves insertion order for deterministic processing
+ Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new
LinkedHashMap<>();
+ for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+ if (isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with
updateForStagedCreate: %s", change);
+ }
+ changesByTable.computeIfAbsent(change.identifier(), k -> new
ArrayList<>()).add(change);
Review Comment:
Does the Iceberg spec allow more than one table update in one
`commitTransaction` operation?
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1042,57 +1043,71 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
new TransactionWorkspaceMetaStoreManager(diagnostics,
metaStoreManager);
((IcebergCatalog)
baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
- commitTransactionRequest.tableChanges().stream()
- .forEach(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable baseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- if (isCreate(change)) {
- throw new BadRequestException(
- "Unsupported operation: commitTranaction with
updateForStagedCreate: %s",
- change);
- }
+ // Group all changes by table identifier to handle them atomically
+ // This prevents conflicts when multiple changes target the same table
entity
+ // LinkedHashMap preserves insertion order for deterministic processing
+ Map<TableIdentifier, List<UpdateTableRequest>> changesByTable = new
LinkedHashMap<>();
+ for (UpdateTableRequest change : commitTransactionRequest.tableChanges()) {
+ if (isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with
updateForStagedCreate: %s", change);
+ }
+ changesByTable.computeIfAbsent(change.identifier(), k -> new
ArrayList<>()).add(change);
Review Comment:
Does the Iceberg REST Catalog spec allow more than one table update in one
`commitTransaction` operation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]