This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c67c9124d3 Core, Spark: Spark writes/actions should only perform 
cleanup if failure is cleanable (#10373)
c67c9124d3 is described below

commit c67c9124d324207d742307982de31e5ff2ebcd01
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Jun 20 08:27:59 2024 -0700

    Core, Spark: Spark writes/actions should only perform cleanup if failure is 
cleanable (#10373)
---
 .../actions/BaseRewriteDataFilesAction.java        | 16 ++++---
 .../actions/RewriteDataFilesCommitManager.java     |  9 +++-
 .../RewritePositionDeletesCommitManager.java       |  9 +++-
 .../spark/actions/TestRewriteDataFilesAction.java  | 56 ++++++++++++++++++++--
 .../spark/actions/TestRewriteDataFilesAction.java  | 56 ++++++++++++++++++++--
 .../spark/actions/RewriteManifestsSparkAction.java |  8 +++-
 .../spark/source/SparkPositionDeltaWrite.java      | 10 ++--
 .../apache/iceberg/spark/source/SparkWrite.java    | 10 ++--
 .../spark/actions/TestRewriteDataFilesAction.java  | 56 ++++++++++++++++++++--
 9 files changed, 193 insertions(+), 37 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java 
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 5f229be579..c0f2fc6174 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -308,12 +309,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
       LOG.warn("Commit state unknown, cannot clean up files that may have been 
committed", e);
       throw e;
     } catch (Exception e) {
-      LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
-      Tasks.foreach(Iterables.transform(addedDataFiles, f -> 
f.path().toString()))
-          .noRetry()
-          .suppressFailureWhenFinished()
-          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
-          .run(fileIO::deleteFile);
+      if (e instanceof CleanableFailure) {
+        LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
+        Tasks.foreach(Iterables.transform(addedDataFiles, f -> 
f.path().toString()))
+            .noRetry()
+            .suppressFailureWhenFinished()
+            .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", 
location, exc))
+            .run(fileIO::deleteFile);
+      }
+
       throw e;
     }
   }
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 7f89db467d..45b4bcf0a4 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -119,8 +120,12 @@ public class RewriteDataFilesCommitManager {
           e);
       throw e;
     } catch (Exception e) {
-      LOG.error("Cannot commit groups {}, attempting to clean up written 
files", rewriteGroups, e);
-      rewriteGroups.forEach(this::abortFileGroup);
+      if (e instanceof CleanableFailure) {
+        LOG.error(
+            "Cannot commit groups {}, attempting to clean up written files", 
rewriteGroups, e);
+        rewriteGroups.forEach(this::abortFileGroup);
+      }
+
       throw e;
     }
   }
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
index 01b2f7528e..b1322d5e58 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -102,8 +103,12 @@ public class RewritePositionDeletesCommitManager {
           e);
       throw e;
     } catch (Exception e) {
-      LOG.error("Cannot commit groups {}, attempting to clean up written 
files", rewriteGroups, e);
-      rewriteGroups.forEach(this::abort);
+      if (e instanceof CleanableFailure) {
+        LOG.error(
+            "Cannot commit groups {}, attempting to clean up written files", 
rewriteGroups, e);
+        rewriteGroups.forEach(this::abort);
+      }
+
       throw e;
     }
   }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2934494bba..c978be9e3d 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -77,6 +77,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -673,14 +674,14 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
 
     // Fail to commit
-    doThrow(new RuntimeException("Commit 
Failure")).when(util).commitFileGroups(any());
+    doThrow(new CommitFailedException("Commit 
Failure")).when(util).commitFileGroups(any());
 
     
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
 
     assertThatThrownBy(() -> spyRewrite.execute())
         .as("Should fail entire rewrite if commit fails")
         .isInstanceOf(RuntimeException.class)
-        .hasMessage("Commit Failure");
+        .hasMessageContaining("Cannot commit rewrite");
 
     table.refresh();
 
@@ -692,6 +693,40 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
+  @Test
+  public void testCommitFailsWithUncleanableFailure() {
+    Table table = createTable(20);
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFilesSparkAction realRewrite =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, 
Integer.toString(fileSize * 2 + 1000));
+
+    RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+    RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
+
+    // Fail to commit with an arbitrary failure and validate that orphans are 
not cleaned up
+    doThrow(new RuntimeException("Arbitrary 
Failure")).when(util).commitFileGroups(any());
+
+    
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+    assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Arbitrary Failure");
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+
+    shouldHaveSnapshots(table, 1);
+    shouldHaveOrphans(table);
+    shouldHaveACleanCache(table);
+  }
+
   @Test
   public void testParallelSingleCommitWithRewriteFailure() {
     Table table = createTable(20);
@@ -709,13 +744,13 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     // Fail groups 1, 3, and 7 during rewrite
     GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
+    doThrow(new CommitFailedException("Rewrite Failed"))
         .when(spyRewrite)
         .rewriteFiles(any(), argThat(failGroup));
 
     assertThatThrownBy(() -> spyRewrite.execute())
         .as("Should fail entire rewrite if part fails")
-        .isInstanceOf(RuntimeException.class)
+        .isInstanceOf(CommitFailedException.class)
         .hasMessage("Rewrite Failed");
 
     table.refresh();
@@ -830,7 +865,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     // First and Third commits work, second does not
     doCallRealMethod()
-        .doThrow(new RuntimeException("Commit Failed"))
+        .doThrow(new CommitFailedException("Commit Failed"))
         .doCallRealMethod()
         .when(util)
         .commitFileGroups(any());
@@ -1562,6 +1597,17 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .orphanFileLocations());
   }
 
+  protected void shouldHaveOrphans(Table table) {
+    assertThat(
+            actions()
+                .deleteOrphanFiles(table)
+                .olderThan(System.currentTimeMillis())
+                .execute()
+                .orphanFileLocations())
+        .as("Should have found orphan files")
+        .isNotEmpty();
+  }
+
   protected void shouldHaveACleanCache(Table table) {
     Assert.assertEquals(
         "Should not have any entries in cache", ImmutableSet.of(), 
cacheContents(table));
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 66f16437f5..ba173d0249 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -77,6 +77,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -707,13 +708,13 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
 
     // Fail to commit
-    doThrow(new RuntimeException("Commit 
Failure")).when(util).commitFileGroups(any());
+    doThrow(new CommitFailedException("Commit 
Failure")).when(util).commitFileGroups(any());
 
     
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
 
     assertThatThrownBy(spyRewrite::execute)
         .isInstanceOf(RuntimeException.class)
-        .hasMessage("Commit Failure");
+        .hasMessageContaining("Cannot commit rewrite");
 
     table.refresh();
 
@@ -725,6 +726,40 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveACleanCache(table);
   }
 
+  @Test
+  public void testCommitFailsWithUncleanableFailure() {
+    Table table = createTable(20);
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFilesSparkAction realRewrite =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, 
Integer.toString(fileSize * 2 + 1000));
+
+    RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+    RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
+
+    // Fail to commit with an arbitrary failure and validate that orphans are 
not cleaned up
+    doThrow(new RuntimeException("Arbitrary 
Failure")).when(util).commitFileGroups(any());
+
+    
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+    assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Arbitrary Failure");
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+
+    shouldHaveSnapshots(table, 1);
+    shouldHaveOrphans(table);
+    shouldHaveACleanCache(table);
+  }
+
   @Test
   public void testParallelSingleCommitWithRewriteFailure() {
     Table table = createTable(20);
@@ -742,12 +777,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     // Fail groups 1, 3, and 7 during rewrite
     GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
+    doThrow(new CommitFailedException("Rewrite Failed"))
         .when(spyRewrite)
         .rewriteFiles(any(), argThat(failGroup));
 
     assertThatThrownBy(spyRewrite::execute)
-        .isInstanceOf(RuntimeException.class)
+        .isInstanceOf(CommitFailedException.class)
         .hasMessage("Rewrite Failed");
 
     table.refresh();
@@ -866,7 +901,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     // First and Third commits work, second does not
     doCallRealMethod()
-        .doThrow(new RuntimeException("Commit Failed"))
+        .doThrow(new CommitFailedException("Commit Failed"))
         .doCallRealMethod()
         .when(util)
         .commitFileGroups(any());
@@ -1599,6 +1634,17 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .orphanFileLocations());
   }
 
+  protected void shouldHaveOrphans(Table table) {
+    assertThat(
+            actions()
+                .deleteOrphanFiles(table)
+                .olderThan(System.currentTimeMillis())
+                .execute()
+                .orphanFileLocations())
+        .as("Should have found orphan files")
+        .isNotEmpty();
+  }
+
   protected void shouldHaveACleanCache(Table table) {
     Assert.assertEquals(
         "Should not have any entries in cache", ImmutableSet.of(), 
cacheContents(table));
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 8ec3b44f92..e9edfeb985 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ImmutableRewriteManifests;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
@@ -355,8 +356,11 @@ public class RewriteManifestsSparkAction
       // don't clean up added manifest files, because they may have been 
successfully committed.
       throw commitStateUnknownException;
     } catch (Exception e) {
-      // delete all new manifests because the rewrite failed
-      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      if (e instanceof CleanableFailure) {
+        // delete all new manifests because the rewrite failed
+        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      }
+
       throw e;
     }
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index a964f76863..02c87b53e7 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -44,7 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
   private final Context context;
   private final Map<String, String> writeProperties;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkPositionDeltaWrite(
       SparkSession spark,
@@ -304,9 +304,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
         operation.commit(); // abort is automatically called if this fails
         long duration = System.currentTimeMillis() - start;
         LOG.info("Committed in {} ms", duration);
-      } catch (CommitStateUnknownException commitStateUnknownException) {
-        cleanupOnAbort = false;
-        throw commitStateUnknownException;
+      } catch (Exception e) {
+        cleanupOnAbort = e instanceof CleanableFailure;
+        throw e;
       }
     }
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d23c473bb4..e4a0eb700b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private final SparkWriteRequirements writeRequirements;
   private final Map<String, String> writeProperties;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkWrite(
       SparkSession spark,
@@ -233,9 +233,9 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       operation.commit(); // abort is automatically called if this fails
       long duration = System.currentTimeMillis() - start;
       LOG.info("Committed in {} ms", duration);
-    } catch (CommitStateUnknownException commitStateUnknownException) {
-      cleanupOnAbort = false;
-      throw commitStateUnknownException;
+    } catch (Exception e) {
+      cleanupOnAbort = e instanceof CleanableFailure;
+      throw e;
     }
   }
 
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 16082d78f0..b67ee87c7d 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -79,6 +79,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -725,13 +726,13 @@ public class TestRewriteDataFilesAction extends TestBase {
     RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
 
     // Fail to commit
-    doThrow(new RuntimeException("Commit 
Failure")).when(util).commitFileGroups(any());
+    doThrow(new CommitFailedException("Commit 
Failure")).when(util).commitFileGroups(any());
 
     
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
 
     assertThatThrownBy(spyRewrite::execute)
         .isInstanceOf(RuntimeException.class)
-        .hasMessage("Commit Failure");
+        .hasMessageContaining("Cannot commit rewrite");
 
     table.refresh();
 
@@ -743,6 +744,40 @@ public class TestRewriteDataFilesAction extends TestBase {
     shouldHaveACleanCache(table);
   }
 
+  @Test
+  public void testCommitFailsWithUncleanableFailure() {
+    Table table = createTable(20);
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFilesSparkAction realRewrite =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, 
Integer.toString(fileSize * 2 + 1000));
+
+    RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+    RewriteDataFilesCommitManager util = spy(new 
RewriteDataFilesCommitManager(table));
+
+    // Fail to commit with an arbitrary failure and validate that orphans are 
not cleaned up
+    doThrow(new RuntimeException("Arbitrary 
Failure")).when(util).commitFileGroups(any());
+
+    
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+    assertThatThrownBy(spyRewrite::execute)
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Arbitrary Failure");
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+
+    shouldHaveSnapshots(table, 1);
+    shouldHaveOrphans(table);
+    shouldHaveACleanCache(table);
+  }
+
   @Test
   public void testParallelSingleCommitWithRewriteFailure() {
     Table table = createTable(20);
@@ -760,12 +795,12 @@ public class TestRewriteDataFilesAction extends TestBase {
 
     // Fail groups 1, 3, and 7 during rewrite
     GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
-    doThrow(new RuntimeException("Rewrite Failed"))
+    doThrow(new CommitFailedException("Rewrite Failed"))
         .when(spyRewrite)
         .rewriteFiles(any(), argThat(failGroup));
 
     assertThatThrownBy(spyRewrite::execute)
-        .isInstanceOf(RuntimeException.class)
+        .isInstanceOf(CommitFailedException.class)
         .hasMessage("Rewrite Failed");
 
     table.refresh();
@@ -884,7 +919,7 @@ public class TestRewriteDataFilesAction extends TestBase {
 
     // First and Third commits work, second does not
     doCallRealMethod()
-        .doThrow(new RuntimeException("Commit Failed"))
+        .doThrow(new CommitFailedException("Commit Failed"))
         .doCallRealMethod()
         .when(util)
         .commitFileGroups(any());
@@ -1681,6 +1716,17 @@ public class TestRewriteDataFilesAction extends TestBase 
{
         .isEmpty();
   }
 
+  protected void shouldHaveOrphans(Table table) {
+    assertThat(
+            actions()
+                .deleteOrphanFiles(table)
+                .olderThan(System.currentTimeMillis())
+                .execute()
+                .orphanFileLocations())
+        .as("Should have found orphan files")
+        .isNotEmpty();
+  }
+
   protected void shouldHaveACleanCache(Table table) {
     assertThat(cacheContents(table)).as("Should not have any entries in 
cache").isEmpty();
   }

Reply via email to