This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c67c9124d3 Core, Spark: Spark writes/actions should only perform
cleanup if failure is cleanable (#10373)
c67c9124d3 is described below
commit c67c9124d324207d742307982de31e5ff2ebcd01
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Thu Jun 20 08:27:59 2024 -0700
Core, Spark: Spark writes/actions should only perform cleanup if failure is
cleanable (#10373)
---
.../actions/BaseRewriteDataFilesAction.java | 16 ++++---
.../actions/RewriteDataFilesCommitManager.java | 9 +++-
.../RewritePositionDeletesCommitManager.java | 9 +++-
.../spark/actions/TestRewriteDataFilesAction.java | 56 ++++++++++++++++++++--
.../spark/actions/TestRewriteDataFilesAction.java | 56 ++++++++++++++++++++--
.../spark/actions/RewriteManifestsSparkAction.java | 8 +++-
.../spark/source/SparkPositionDeltaWrite.java | 10 ++--
.../apache/iceberg/spark/source/SparkWrite.java | 10 ++--
.../spark/actions/TestRewriteDataFilesAction.java | 56 ++++++++++++++++++++--
9 files changed, 193 insertions(+), 37 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 5f229be579..c0f2fc6174 100644
---
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -308,12 +309,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
LOG.warn("Commit state unknown, cannot clean up files that may have been
committed", e);
throw e;
} catch (Exception e) {
- LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
- Tasks.foreach(Iterables.transform(addedDataFiles, f ->
f.path().toString()))
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
- .run(fileIO::deleteFile);
+ if (e instanceof CleanableFailure) {
+ LOG.warn("Failed to commit rewrite, cleaning up rewritten files", e);
+ Tasks.foreach(Iterables.transform(addedDataFiles, f ->
f.path().toString()))
+ .noRetry()
+ .suppressFailureWhenFinished()
+ .onFailure((location, exc) -> LOG.warn("Failed to delete: {}",
location, exc))
+ .run(fileIO::deleteFile);
+ }
+
throw e;
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
index 7f89db467d..45b4bcf0a4 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -119,8 +120,12 @@ public class RewriteDataFilesCommitManager {
e);
throw e;
} catch (Exception e) {
- LOG.error("Cannot commit groups {}, attempting to clean up written
files", rewriteGroups, e);
- rewriteGroups.forEach(this::abortFileGroup);
+ if (e instanceof CleanableFailure) {
+ LOG.error(
+ "Cannot commit groups {}, attempting to clean up written files",
rewriteGroups, e);
+ rewriteGroups.forEach(this::abortFileGroup);
+ }
+
throw e;
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
index 01b2f7528e..b1322d5e58 100644
---
a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
+++
b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -102,8 +103,12 @@ public class RewritePositionDeletesCommitManager {
e);
throw e;
} catch (Exception e) {
- LOG.error("Cannot commit groups {}, attempting to clean up written
files", rewriteGroups, e);
- rewriteGroups.forEach(this::abort);
+ if (e instanceof CleanableFailure) {
+ LOG.error(
+ "Cannot commit groups {}, attempting to clean up written files",
rewriteGroups, e);
+ rewriteGroups.forEach(this::abort);
+ }
+
throw e;
}
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2934494bba..c978be9e3d 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -77,6 +77,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -673,14 +674,14 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
// Fail to commit
- doThrow(new RuntimeException("Commit
Failure")).when(util).commitFileGroups(any());
+ doThrow(new CommitFailedException("Commit
Failure")).when(util).commitFileGroups(any());
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
assertThatThrownBy(() -> spyRewrite.execute())
.as("Should fail entire rewrite if commit fails")
.isInstanceOf(RuntimeException.class)
- .hasMessage("Commit Failure");
+ .hasMessageContaining("Cannot commit rewrite");
table.refresh();
@@ -692,6 +693,40 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
+ @Test
+ public void testCommitFailsWithUncleanableFailure() {
+ Table table = createTable(20);
+ int fileSize = averageFileSize(table);
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFilesSparkAction realRewrite =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
Integer.toString(fileSize * 2 + 1000));
+
+ RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+ RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
+
+ // Fail to commit with an arbitrary failure and validate that orphans are
not cleaned up
+ doThrow(new RuntimeException("Arbitrary
Failure")).when(util).commitFileGroups(any());
+
+
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+ assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Arbitrary Failure");
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 1);
+ shouldHaveOrphans(table);
+ shouldHaveACleanCache(table);
+ }
+
@Test
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
@@ -709,13 +744,13 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// Fail groups 1, 3, and 7 during rewrite
GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
- doThrow(new RuntimeException("Rewrite Failed"))
+ doThrow(new CommitFailedException("Rewrite Failed"))
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));
assertThatThrownBy(() -> spyRewrite.execute())
.as("Should fail entire rewrite if part fails")
- .isInstanceOf(RuntimeException.class)
+ .isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");
table.refresh();
@@ -830,7 +865,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// First and Third commits work, second does not
doCallRealMethod()
- .doThrow(new RuntimeException("Commit Failed"))
+ .doThrow(new CommitFailedException("Commit Failed"))
.doCallRealMethod()
.when(util)
.commitFileGroups(any());
@@ -1562,6 +1597,17 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.orphanFileLocations());
}
+ protected void shouldHaveOrphans(Table table) {
+ assertThat(
+ actions()
+ .deleteOrphanFiles(table)
+ .olderThan(System.currentTimeMillis())
+ .execute()
+ .orphanFileLocations())
+ .as("Should have found orphan files")
+ .isNotEmpty();
+ }
+
protected void shouldHaveACleanCache(Table table) {
Assert.assertEquals(
"Should not have any entries in cache", ImmutableSet.of(),
cacheContents(table));
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 66f16437f5..ba173d0249 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -77,6 +77,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -707,13 +708,13 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
// Fail to commit
- doThrow(new RuntimeException("Commit
Failure")).when(util).commitFileGroups(any());
+ doThrow(new CommitFailedException("Commit
Failure")).when(util).commitFileGroups(any());
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
- .hasMessage("Commit Failure");
+ .hasMessageContaining("Cannot commit rewrite");
table.refresh();
@@ -725,6 +726,40 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveACleanCache(table);
}
+ @Test
+ public void testCommitFailsWithUncleanableFailure() {
+ Table table = createTable(20);
+ int fileSize = averageFileSize(table);
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFilesSparkAction realRewrite =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
Integer.toString(fileSize * 2 + 1000));
+
+ RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+ RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
+
+ // Fail to commit with an arbitrary failure and validate that orphans are
not cleaned up
+ doThrow(new RuntimeException("Arbitrary
Failure")).when(util).commitFileGroups(any());
+
+
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+ assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Arbitrary Failure");
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 1);
+ shouldHaveOrphans(table);
+ shouldHaveACleanCache(table);
+ }
+
@Test
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
@@ -742,12 +777,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// Fail groups 1, 3, and 7 during rewrite
GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
- doThrow(new RuntimeException("Rewrite Failed"))
+ doThrow(new CommitFailedException("Rewrite Failed"))
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));
assertThatThrownBy(spyRewrite::execute)
- .isInstanceOf(RuntimeException.class)
+ .isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");
table.refresh();
@@ -866,7 +901,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// First and Third commits work, second does not
doCallRealMethod()
- .doThrow(new RuntimeException("Commit Failed"))
+ .doThrow(new CommitFailedException("Commit Failed"))
.doCallRealMethod()
.when(util)
.commitFileGroups(any());
@@ -1599,6 +1634,17 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.orphanFileLocations());
}
+ protected void shouldHaveOrphans(Table table) {
+ assertThat(
+ actions()
+ .deleteOrphanFiles(table)
+ .olderThan(System.currentTimeMillis())
+ .execute()
+ .orphanFileLocations())
+ .as("Should have found orphan files")
+ .isNotEmpty();
+ }
+
protected void shouldHaveACleanCache(Table table) {
Assert.assertEquals(
"Should not have any entries in cache", ImmutableSet.of(),
cacheContents(table));
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 8ec3b44f92..e9edfeb985 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
@@ -355,8 +356,11 @@ public class RewriteManifestsSparkAction
// don't clean up added manifest files, because they may have been
successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
- // delete all new manifests because the rewrite failed
- deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ if (e instanceof CleanableFailure) {
+ // delete all new manifests because the rewrite failed
+ deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ }
+
throw e;
}
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index a964f76863..02c87b53e7 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -44,7 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final Context context;
private final Map<String, String> writeProperties;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkPositionDeltaWrite(
SparkSession spark,
@@ -304,9 +304,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d23c473bb4..e4a0eb700b 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -102,7 +102,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkWrite(
SparkSession spark,
@@ -233,9 +233,9 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 16082d78f0..b67ee87c7d 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -79,6 +79,7 @@ import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -725,13 +726,13 @@ public class TestRewriteDataFilesAction extends TestBase {
RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
// Fail to commit
- doThrow(new RuntimeException("Commit
Failure")).when(util).commitFileGroups(any());
+ doThrow(new CommitFailedException("Commit
Failure")).when(util).commitFileGroups(any());
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
assertThatThrownBy(spyRewrite::execute)
.isInstanceOf(RuntimeException.class)
- .hasMessage("Commit Failure");
+ .hasMessageContaining("Cannot commit rewrite");
table.refresh();
@@ -743,6 +744,40 @@ public class TestRewriteDataFilesAction extends TestBase {
shouldHaveACleanCache(table);
}
+ @Test
+ public void testCommitFailsWithUncleanableFailure() {
+ Table table = createTable(20);
+ int fileSize = averageFileSize(table);
+
+ List<Object[]> originalData = currentData();
+
+ RewriteDataFilesSparkAction realRewrite =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES,
Integer.toString(fileSize * 2 + 1000));
+
+ RewriteDataFilesSparkAction spyRewrite = spy(realRewrite);
+ RewriteDataFilesCommitManager util = spy(new
RewriteDataFilesCommitManager(table));
+
+ // Fail to commit with an arbitrary failure and validate that orphans are
not cleaned up
+ doThrow(new RuntimeException("Arbitrary
Failure")).when(util).commitFileGroups(any());
+
+
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
+
+ assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Arbitrary Failure");
+
+ table.refresh();
+
+ List<Object[]> postRewriteData = currentData();
+ assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
+
+ shouldHaveSnapshots(table, 1);
+ shouldHaveOrphans(table);
+ shouldHaveACleanCache(table);
+ }
+
@Test
public void testParallelSingleCommitWithRewriteFailure() {
Table table = createTable(20);
@@ -760,12 +795,12 @@ public class TestRewriteDataFilesAction extends TestBase {
// Fail groups 1, 3, and 7 during rewrite
GroupInfoMatcher failGroup = new GroupInfoMatcher(1, 3, 7);
- doThrow(new RuntimeException("Rewrite Failed"))
+ doThrow(new CommitFailedException("Rewrite Failed"))
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));
assertThatThrownBy(spyRewrite::execute)
- .isInstanceOf(RuntimeException.class)
+ .isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");
table.refresh();
@@ -884,7 +919,7 @@ public class TestRewriteDataFilesAction extends TestBase {
// First and Third commits work, second does not
doCallRealMethod()
- .doThrow(new RuntimeException("Commit Failed"))
+ .doThrow(new CommitFailedException("Commit Failed"))
.doCallRealMethod()
.when(util)
.commitFileGroups(any());
@@ -1681,6 +1716,17 @@ public class TestRewriteDataFilesAction extends TestBase
{
.isEmpty();
}
+ protected void shouldHaveOrphans(Table table) {
+ assertThat(
+ actions()
+ .deleteOrphanFiles(table)
+ .olderThan(System.currentTimeMillis())
+ .execute()
+ .orphanFileLocations())
+ .as("Should have found orphan files")
+ .isNotEmpty();
+ }
+
protected void shouldHaveACleanCache(Table table) {
assertThat(cacheContents(table)).as("Should not have any entries in
cache").isEmpty();
}