This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 768e516e0b Add strict metadata cleanup to TableOperation. (#8397)
768e516e0b is described below

commit 768e516e0b138a6e996fe38fe5be4a0f624862db
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Sep 7 11:06:09 2023 -0700

    Add strict metadata cleanup to TableOperation. (#8397)
    
    Co-authored-by: Ryan Blue <[email protected]>
    
    Strict metadata cleanup will only trigger cleanups during commits when the 
commit fails with an
    exception which implements the marker interface CleanableFailure
    This will get used in SnapshotProducer and BaseTransaction and is
    useful for catalogs like the REST catalog where arbitrary HTTP
    client exceptions can be thrown (instead of the usual CommitStateUnknown
    exceptions).
---
 .../iceberg/exceptions/BadRequestException.java    |   2 +-
 ...RequestException.java => CleanableFailure.java} |  19 +-
 .../iceberg/exceptions/CommitFailedException.java  |   2 +-
 .../iceberg/exceptions/ForbiddenException.java     |   2 +-
 .../exceptions/NoSuchIcebergTableException.java    |   2 +-
 .../exceptions/NoSuchNamespaceException.java       |   2 +-
 .../iceberg/exceptions/NoSuchTableException.java   |   2 +-
 .../iceberg/exceptions/NotAuthorizedException.java |   2 +-
 .../exceptions/ServiceUnavailableException.java    |   2 +-
 .../iceberg/exceptions/ValidationException.java    |   2 +-
 .../java/org/apache/iceberg/BaseTransaction.java   |  47 +++--
 .../java/org/apache/iceberg/SnapshotProducer.java  |  10 +-
 .../java/org/apache/iceberg/TableOperations.java   |  11 ++
 .../apache/iceberg/rest/RESTTableOperations.java   |   5 +
 .../org/apache/iceberg/catalog/CatalogTests.java   |   2 +-
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 210 +++++++++++++++++++++
 16 files changed, 272 insertions(+), 50 deletions(-)

diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java
index 54c47eda91..c699c1ab22 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception thrown on HTTP 400 - Bad Request */
-public class BadRequestException extends RuntimeException {
+public class BadRequestException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public BadRequestException(String message, Object... args) {
     super(String.format(message, args));
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/CleanableFailure.java
similarity index 65%
copy from 
api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java
copy to api/src/main/java/org/apache/iceberg/exceptions/CleanableFailure.java
index 54c47eda91..1c9b1c5ce8 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/BadRequestException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CleanableFailure.java
@@ -18,17 +18,8 @@
  */
 package org.apache.iceberg.exceptions;
 
-import com.google.errorprone.annotations.FormatMethod;
-
-/** Exception thrown on HTTP 400 - Bad Request */
-public class BadRequestException extends RuntimeException {
-  @FormatMethod
-  public BadRequestException(String message, Object... args) {
-    super(String.format(message, args));
-  }
-
-  @FormatMethod
-  public BadRequestException(Throwable cause, String message, Object... args) {
-    super(String.format(message, args), cause);
-  }
-}
+/**
+ * A marker interface for commit exceptions where the state is known to be 
failure and uncommitted
+ * metadata can be cleaned up.
+ */
+public interface CleanableFailure {}
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
index b9efe55d33..0a55f8ae52 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitFailedException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception raised when a commit fails because of out of date metadata. */
-public class CommitFailedException extends RuntimeException {
+public class CommitFailedException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public CommitFailedException(String message, Object... args) {
     super(String.format(message, args));
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/ForbiddenException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/ForbiddenException.java
index 0babdb48c7..f66b417fa7 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/ForbiddenException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/ForbiddenException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception thrown on HTTP 403 Forbidden - Failed authorization checks. */
-public class ForbiddenException extends RuntimeException {
+public class ForbiddenException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public ForbiddenException(String message, Object... args) {
     super(String.format(message, args));
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergTableException.java
 
b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergTableException.java
index fb4da52a44..a996d0ffaf 100644
--- 
a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergTableException.java
+++ 
b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergTableException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** NoSuchTableException thrown when a table is found but it is not an Iceberg 
table. */
-public class NoSuchIcebergTableException extends NoSuchTableException {
+public class NoSuchIcebergTableException extends NoSuchTableException 
implements CleanableFailure {
   @FormatMethod
   public NoSuchIcebergTableException(String message, Object... args) {
     super(message, args);
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchNamespaceException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchNamespaceException.java
index ce8bb8d6a2..a998267a62 100644
--- 
a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchNamespaceException.java
+++ 
b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchNamespaceException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception raised when attempting to load a namespace that does not exist. 
*/
-public class NoSuchNamespaceException extends RuntimeException {
+public class NoSuchNamespaceException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public NoSuchNamespaceException(String message, Object... args) {
     super(String.format(message, args));
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchTableException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchTableException.java
index 54b270d93a..a7976cec72 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchTableException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchTableException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception raised when attempting to load a table that does not exist. */
-public class NoSuchTableException extends RuntimeException {
+public class NoSuchTableException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public NoSuchTableException(String message, Object... args) {
     super(String.format(message, args));
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java
index 208c2f460a..cfaccc78c4 100644
--- 
a/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java
+++ 
b/api/src/main/java/org/apache/iceberg/exceptions/NotAuthorizedException.java
@@ -24,7 +24,7 @@ import com.google.errorprone.annotations.FormatMethod;
  * Exception thrown on HTTP 401 Unauthorized. The user is either not 
authenticated or failed
  * authorization checks.
  */
-public class NotAuthorizedException extends RESTException {
+public class NotAuthorizedException extends RESTException implements 
CleanableFailure {
   @FormatMethod
   public NotAuthorizedException(String message, Object... args) {
     super(message, args);
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
 
b/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
index df75473c5b..73df2a811d 100644
--- 
a/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
+++ 
b/api/src/main/java/org/apache/iceberg/exceptions/ServiceUnavailableException.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.exceptions;
 import com.google.errorprone.annotations.FormatMethod;
 
 /** Exception thrown on HTTP 503: service is unavailable */
-public class ServiceUnavailableException extends RESTException {
+public class ServiceUnavailableException extends RESTException implements 
CleanableFailure {
   @FormatMethod
   public ServiceUnavailableException(String message, Object... args) {
     super(message, args);
diff --git 
a/api/src/main/java/org/apache/iceberg/exceptions/ValidationException.java 
b/api/src/main/java/org/apache/iceberg/exceptions/ValidationException.java
index dc40469df0..94dc79f1e5 100644
--- a/api/src/main/java/org/apache/iceberg/exceptions/ValidationException.java
+++ b/api/src/main/java/org/apache/iceberg/exceptions/ValidationException.java
@@ -32,7 +32,7 @@ import org.apache.iceberg.Schema;
  * <p>For example, this is thrown when attempting to create a table with a 
{@link PartitionSpec}
  * that is not compatible with the table {@link Schema}
  */
-public class ValidationException extends RuntimeException {
+public class ValidationException extends RuntimeException implements 
CleanableFailure {
   @FormatMethod
   public ValidationException(String message, Object... args) {
     super(String.format(message, args));
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 61da776f4c..c1ebd554be 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
@@ -318,18 +319,12 @@ public class BaseTransaction implements Transaction {
       throw e;
 
     } catch (RuntimeException e) {
-      // the commit failed and no files were committed. clean up each update.
-      Tasks.foreach(updates)
-          .suppressFailureWhenFinished()
-          .run(
-              update -> {
-                if (update instanceof SnapshotProducer) {
-                  ((SnapshotProducer) update).cleanAll();
-                }
-              });
+      // the commit failed and no files were committed. clean up each update
+      if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
+        cleanAllUpdates();
+      }
 
       throw e;
-
     } finally {
       // create table never needs to retry because the table has no previous 
state. because retries
       // are not a
@@ -380,14 +375,9 @@ public class BaseTransaction implements Transaction {
 
     } catch (RuntimeException e) {
       // the commit failed and no files were committed. clean up each update.
-      Tasks.foreach(updates)
-          .suppressFailureWhenFinished()
-          .run(
-              update -> {
-                if (update instanceof SnapshotProducer) {
-                  ((SnapshotProducer) update).cleanAll();
-                }
-              });
+      if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
+        cleanAllUpdates();
+      }
 
       throw e;
 
@@ -433,7 +423,10 @@ public class BaseTransaction implements Transaction {
       cleanUpOnCommitFailure();
       throw e.wrapped();
     } catch (RuntimeException e) {
-      cleanUpOnCommitFailure();
+      if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
+        cleanUpOnCommitFailure();
+      }
+
       throw e;
     }
 
@@ -474,6 +467,16 @@ public class BaseTransaction implements Transaction {
 
   private void cleanUpOnCommitFailure() {
     // the commit failed and no files were committed. clean up each update.
+    cleanAllUpdates();
+
+    // delete all the uncommitted files
+    Tasks.foreach(deletedFiles)
+        .suppressFailureWhenFinished()
+        .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: 
{}", file, exc))
+        .run(ops.io()::deleteFile);
+  }
+
+  private void cleanAllUpdates() {
     Tasks.foreach(updates)
         .suppressFailureWhenFinished()
         .run(
@@ -482,12 +485,6 @@ public class BaseTransaction implements Transaction {
                 ((SnapshotProducer) update).cleanAll();
               }
             });
-
-    // delete all files that were cleaned up
-    Tasks.foreach(deletedFiles)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: 
{}", file, exc))
-        .run(ops.io()::deleteFile);
   }
 
   private void applyUpdates(TableOperations underlyingOps) {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 5a6a01ea06..4ecdd21d93 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -83,6 +84,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata;
 
   private final TableOperations ops;
+  private final boolean strictCleanup;
   private final String commitUUID = UUID.randomUUID().toString();
   private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final AtomicInteger attempt = new AtomicInteger(0);
@@ -100,6 +102,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
 
   protected SnapshotProducer(TableOperations ops) {
     this.ops = ops;
+    this.strictCleanup = ops.requireStrictCleanup();
     this.base = ops.current();
     this.manifestsWithMetadata =
         Caffeine.newBuilder()
@@ -353,6 +356,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   }
 
   @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public void commit() {
     // this is always set to the latest commit attempt's snapshot id.
     AtomicLong newSnapshotId = new AtomicLong(-1L);
@@ -399,7 +403,11 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     } catch (CommitStateUnknownException commitStateUnknownException) {
       throw commitStateUnknownException;
     } catch (RuntimeException e) {
-      Exceptions.suppressAndThrow(e, this::cleanAll);
+      if (!strictCleanup || e instanceof CleanableFailure) {
+        Exceptions.suppressAndThrow(e, this::cleanAll);
+      }
+
+      throw e;
     }
 
     try {
diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java 
b/core/src/main/java/org/apache/iceberg/TableOperations.java
index c00f42275e..6822809d79 100644
--- a/core/src/main/java/org/apache/iceberg/TableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/TableOperations.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
 
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.encryption.PlaintextEncryptionManager;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 
@@ -115,4 +116,14 @@ public interface TableOperations {
   default long newSnapshotId() {
     return SnapshotIdGeneratorUtil.generateSnapshotID();
   }
+
+  /**
+   * Whether to clean up uncommitted metadata files only when a commit fails 
with a {@link
+   * CleanableFailure} exception.
+   *
+   * <p>This defaults to false: any unexpected exception will cause metadata 
files to be cleaned up.
+   */
+  default boolean requireStrictCleanup() {
+    return false;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
index 0ce1afd93a..75c8bdd69f 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java
@@ -240,4 +240,9 @@ class RESTTableOperations implements TableOperations {
       }
     };
   }
+
+  @Override
+  public boolean requireStrictCleanup() {
+    return true;
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java 
b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index 203b5fc381..61fbe39aea 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -113,7 +113,7 @@ public abstract class CatalogTests<C extends Catalog & 
SupportsNamespaces> {
   static final SortOrder REPLACE_WRITE_ORDER =
       SortOrder.builderFor(REPLACE_SCHEMA).asc(Expressions.bucket("id", 
16)).asc("id").build();
 
-  static final DataFile FILE_A =
+  protected static final DataFile FILE_A =
       DataFiles.builder(SPEC)
           .withPath("/path/to/data-a.parquet")
           .withFileSizeInBytes(10)
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 4695ac1b0f..a45f23b4b4 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -32,6 +33,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -41,8 +43,10 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseTransaction;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
@@ -51,12 +55,16 @@ import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NotAuthorizedException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.ServiceFailureException;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.jdbc.JdbcCatalog;
@@ -70,6 +78,7 @@ import 
org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
 import org.apache.iceberg.rest.auth.AuthSessionUtil;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
@@ -87,11 +96,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
 public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
   private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
+  private static final ResourcePaths RESOURCE_PATHS =
+      ResourcePaths.forCatalogProperties(Maps.newHashMap());
 
   @TempDir public Path temp;
 
@@ -2003,4 +2015,202 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     assertThat(schema2.findField("new-column")).isNull();
     assertThat(schema2.columns()).hasSize(1);
   }
+
+  @Test
+  public void testCleanupUncommitedFilesForCleanableFailures() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    DataFile file =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath("/path/to/data-a.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(2)
+            .build();
+
+    Table table = catalog.loadTable(TABLE);
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Mockito.doThrow(new NotAuthorizedException("not authorized"))
+        .when(adapter)
+        .post(any(), any(), any(), any(Map.class), any());
+    assertThatThrownBy(() -> 
catalog.loadTable(TABLE).newFastAppend().appendFile(file).commit())
+        .isInstanceOf(NotAuthorizedException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(TABLE)), captor.capture(), any(), 
any(Map.class), any());
+
+    // Extract the UpdateTableRequest to determine the path of the manifest 
list that should be
+    // cleaned up
+    UpdateTableRequest request = captor.getValue();
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
request.updates().get(0);
+    Assertions.assertThatThrownBy(
+            () -> 
table.io().newInputFile(addSnapshot.snapshot().manifestListLocation()))
+        .isInstanceOf(NotFoundException.class);
+  }
+
+  @Test
+  public void testNoCleanupForNonCleanableExceptions() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    Table table = catalog.loadTable(TABLE);
+
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Mockito.doThrow(new ServiceFailureException("some service failure"))
+        .when(adapter)
+        .post(any(), any(), any(), any(Map.class), any());
+    assertThatThrownBy(() -> 
catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit())
+        .isInstanceOf(ServiceFailureException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(TABLE)), captor.capture(), any(), 
any(Map.class), any());
+
+    // Extract the UpdateTableRequest to determine the path of the manifest 
list that should still
+    // exist even though the commit failed
+    UpdateTableRequest request = captor.getValue();
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
request.updates().get(0);
+    Assertions.assertThat(
+            
table.io().newInputFile(addSnapshot.snapshot().manifestListLocation()).exists())
+        .isTrue();
+  }
+
+  @Test
+  public void testCleanupCleanableExceptionsCreate() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    TableIdentifier newTable = TableIdentifier.of(TABLE.namespace(), 
"some_table");
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Mockito.doThrow(new NotAuthorizedException("not authorized"))
+        .when(adapter)
+        .post(eq(RESOURCE_PATHS.table(newTable)), any(), any(), 
any(Map.class), any());
+
+    Transaction createTableTransaction = 
catalog.newCreateTableTransaction(newTable, SCHEMA);
+    createTableTransaction.newAppend().appendFile(FILE_A).commit();
+    assertThatThrownBy(createTableTransaction::commitTransaction)
+        .isInstanceOf(NotAuthorizedException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(newTable)), captor.capture(), any(), 
any(Map.class), any());
+    UpdateTableRequest request = captor.getValue();
+    Optional<MetadataUpdate> appendSnapshot =
+        request.updates().stream()
+            .filter(update -> update instanceof MetadataUpdate.AddSnapshot)
+            .findFirst();
+
+    assertThat(appendSnapshot).isPresent();
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
appendSnapshot.get();
+    Assertions.assertThatThrownBy(
+            () ->
+                catalog
+                    .loadTable(TABLE)
+                    .io()
+                    
.newInputFile(addSnapshot.snapshot().manifestListLocation()))
+        .isInstanceOf(NotFoundException.class);
+  }
+
+  @Test
+  public void testNoCleanupForNonCleanableCreateTransaction() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    TableIdentifier newTable = TableIdentifier.of(TABLE.namespace(), 
"some_table");
+    Mockito.doThrow(new ServiceFailureException("some service failure"))
+        .when(adapter)
+        .post(eq(RESOURCE_PATHS.table(newTable)), any(), any(), 
any(Map.class), any());
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Transaction createTableTransaction = 
catalog.newCreateTableTransaction(newTable, SCHEMA);
+    createTableTransaction.newAppend().appendFile(FILE_A).commit();
+    assertThatThrownBy(createTableTransaction::commitTransaction)
+        .isInstanceOf(ServiceFailureException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(newTable)), captor.capture(), any(), 
any(Map.class), any());
+    UpdateTableRequest request = captor.getValue();
+    Optional<MetadataUpdate> appendSnapshot =
+        request.updates().stream()
+            .filter(update -> update instanceof MetadataUpdate.AddSnapshot)
+            .findFirst();
+    assertThat(appendSnapshot).isPresent();
+
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
appendSnapshot.get();
+    Assertions.assertThat(
+            catalog
+                .loadTable(TABLE)
+                .io()
+                .newInputFile(addSnapshot.snapshot().manifestListLocation())
+                .exists())
+        .isTrue();
+  }
+
+  @Test
+  public void testCleanupCleanableExceptionsReplace() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Mockito.doThrow(new NotAuthorizedException("not authorized"))
+        .when(adapter)
+        .post(eq(RESOURCE_PATHS.table(TABLE)), any(), any(), any(Map.class), 
any());
+
+    Transaction replaceTableTransaction = 
catalog.newReplaceTableTransaction(TABLE, SCHEMA, false);
+    replaceTableTransaction.newAppend().appendFile(FILE_A).commit();
+    assertThatThrownBy(replaceTableTransaction::commitTransaction)
+        .isInstanceOf(NotAuthorizedException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(TABLE)), captor.capture(), any(), 
any(Map.class), any());
+    UpdateTableRequest request = captor.getValue();
+    Optional<MetadataUpdate> appendSnapshot =
+        request.updates().stream()
+            .filter(update -> update instanceof MetadataUpdate.AddSnapshot)
+            .findFirst();
+
+    assertThat(appendSnapshot).isPresent();
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
appendSnapshot.get();
+    Assertions.assertThatThrownBy(
+            () ->
+                catalog
+                    .loadTable(TABLE)
+                    .io()
+                    
.newInputFile(addSnapshot.snapshot().manifestListLocation()))
+        .isInstanceOf(NotFoundException.class);
+  }
+
+  @Test
+  public void testNoCleanupForNonCleanableReplaceTransaction() {
+    RESTCatalogAdapter adapter = Mockito.spy(new 
RESTCatalogAdapter(backendCatalog));
+    Catalog catalog = catalog(adapter);
+    catalog.createTable(TABLE, SCHEMA);
+    Mockito.doThrow(new ServiceFailureException("some service failure"))
+        .when(adapter)
+        .post(eq(RESOURCE_PATHS.table(TABLE)), any(), any(), any(Map.class), 
any());
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    Transaction replaceTableTransaction = 
catalog.newReplaceTableTransaction(TABLE, SCHEMA, false);
+    replaceTableTransaction.newAppend().appendFile(FILE_A).commit();
+    assertThatThrownBy(replaceTableTransaction::commitTransaction)
+        .isInstanceOf(ServiceFailureException.class);
+    verify(adapter, atLeastOnce())
+        .post(eq(RESOURCE_PATHS.table(TABLE)), captor.capture(), any(), 
any(Map.class), any());
+    UpdateTableRequest request = captor.getValue();
+    Optional<MetadataUpdate> appendSnapshot =
+        request.updates().stream()
+            .filter(update -> update instanceof MetadataUpdate.AddSnapshot)
+            .findFirst();
+    assertThat(appendSnapshot).isPresent();
+
+    MetadataUpdate.AddSnapshot addSnapshot = (MetadataUpdate.AddSnapshot) 
appendSnapshot.get();
+    Assertions.assertThat(
+            catalog
+                .loadTable(TABLE)
+                .io()
+                .newInputFile(addSnapshot.snapshot().manifestListLocation())
+                .exists())
+        .isTrue();
+  }
+
+  private Catalog catalog(RESTCatalogAdapter adapter) {
+    RESTCatalog catalog =
+        new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) 
-> adapter);
+    catalog.initialize(
+        "test",
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+    return catalog;
+  }
 }

Reply via email to