nastra commented on code in PR #4464:
URL: https://github.com/apache/iceberg/pull/4464#discussion_r845271918


##########
core/src/test/java/org/apache/iceberg/TestTransaction.java:
##########
@@ -420,6 +421,35 @@ public void 
testMultipleUpdateTransactionRetryMergeCleanup() {
     Assert.assertFalse("Append manifest should be deleted", new 
File(appendManifest.path()).exists());
   }
 
+  @Test
+  public void testTransactionRetrySchemaUpdate() {
+    // use only one retry
+    table.updateProperties()
+        .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+        .commit();
+
+    // start a transaction
+    Transaction txn = table.newTransaction();
+    // add column "new-column"
+    txn.updateSchema()
+        .addColumn("new-column", Types.IntegerType.get())
+        .commit();
+    int schemaId = txn.table().schema().schemaId();
+
+    // directly update the table for adding "another-column" (which causes 
in-progress txn commit fail)
+    table.updateSchema()
+        .addColumn("another-column", Types.IntegerType.get())
+        .commit();
+    int conflictSchemaId = table.schema().schemaId();

Review Comment:
   nit: conflictSchemaId -> conflictingSchemaId



##########
core/src/main/java/org/apache/iceberg/BaseTransaction.java:
##########
@@ -437,6 +419,40 @@ private void commitSimpleTransaction() {
     }
   }
 
+  private void cleanUpOnCommitFailure() {
+    // the commit failed and no files were committed. clean up each update.
+    Tasks.foreach(updates)
+        .suppressFailureWhenFinished()
+        .run(update -> {
+          if (update instanceof SnapshotProducer) {
+            ((SnapshotProducer) update).cleanAll();
+          }
+        });
+
+    // delete all files that were cleaned up
+    Tasks.foreach(deletedFiles)
+        .suppressFailureWhenFinished()
+        .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: 
{}", file, exc))
+        .run(ops.io()::deleteFile);
+  }
+
+  private void applyUpdates(TableOperations underlyingOps) {
+    if (base != underlyingOps.refresh()) {
+      // use refreshed the metadata
+      this.base = underlyingOps.current();
+      this.current = underlyingOps.current();
+      for (PendingUpdate update : updates) {
+        // re-commit each update in the chain to apply it and update current
+        try {
+          update.commit();
+        } catch (CommitFailedException e) {
+          // Cannot pass even with retry. So, break the retry-loop.

Review Comment:
   shall we add more details as to why it cannot pass?



##########
core/src/main/java/org/apache/iceberg/BaseTransaction.java:
##########
@@ -728,4 +744,20 @@ TableOperations ops() {
   Set<String> deletedFiles() {
     return deletedFiles;
   }
+
+  /**
+   * Exception used to avoid retrying {@link PendingUpdate} when it is failed 
with {@link CommitFailedException}.
+   */
+  private static class ValidationFailureException extends RuntimeException {

Review Comment:
   should we move the `ValidationFailureException` from `CatalogHandlers` to a 
different place and re-use it here rather than re-creating the same class?



##########
core/src/test/java/org/apache/iceberg/TestTransaction.java:
##########
@@ -420,6 +421,35 @@ public void 
testMultipleUpdateTransactionRetryMergeCleanup() {
     Assert.assertFalse("Append manifest should be deleted", new 
File(appendManifest.path()).exists());
   }
 
+  @Test
+  public void testTransactionRetrySchemaUpdate() {
+    // use only one retry
+    table.updateProperties()
+        .set(TableProperties.COMMIT_NUM_RETRIES, "1")
+        .commit();
+
+    // start a transaction
+    Transaction txn = table.newTransaction();
+    // add column "new-column"
+    txn.updateSchema()
+        .addColumn("new-column", Types.IntegerType.get())
+        .commit();
+    int schemaId = txn.table().schema().schemaId();
+
+    // directly update the table for adding "another-column" (which causes 
in-progress txn commit fail)
+    table.updateSchema()
+        .addColumn("another-column", Types.IntegerType.get())
+        .commit();
+    int conflictSchemaId = table.schema().schemaId();
+
+    Assert.assertEquals("both the schema id should be same to become 
conflicting", conflictSchemaId, schemaId);

Review Comment:
   nit: maybe change to `Both schema IDs should be the same in order to cause a 
conflict`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to