This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c890109c1 Spark: Fix usage of staging location when optimizing 
metadata (#8959)
2c890109c1 is described below

commit 2c890109c17bfb971490cab007be23029b81bad8
Author: Anton Okolnychyi <aokolnyc...@apple.com>
AuthorDate: Thu Nov 2 20:39:45 2023 -0700

    Spark: Fix usage of staging location when optimizing metadata (#8959)
---
 .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++----------
 .../spark/actions/TestRewriteManifestsAction.java  | 33 +++++++++++++++--
 .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++----------
 .../spark/actions/TestRewriteManifestsAction.java  | 33 +++++++++++++++--
 .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++----------
 .../spark/actions/TestRewriteManifestsAction.java  | 33 +++++++++++++++--
 .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++----------
 .../spark/actions/TestRewriteManifestsAction.java  | 33 +++++++++++++++--
 8 files changed, 216 insertions(+), 84 deletions(-)

diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 854232a62d..dd287ea193 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p>By default, this action rewrites all manifests for the current partition 
spec and writes the
  * result to the metadata folder. The behavior can be modified by passing a 
custom predicate to
- * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. 
In addition, there is
- * a way to configure a custom location for new manifests via {@link 
#stagingLocation}.
+ * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. 
In addition, there is
+ * a way to configure a custom location for staged manifests via {@link 
#stagingLocation(String)}.
+ * The provided staging location will be ignored if snapshot ID inheritance is 
enabled. In such
+ * cases, the manifests are always written to the metadata folder and 
committed without staging.
  */
 public class RewriteManifestsSparkAction
     extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> 
implements RewriteManifests {
@@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
   private final Table table;
   private final int formatVersion;
   private final long targetManifestSizeBytes;
+  private final boolean shouldStageManifests;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
-  private String stagingLocation = null;
+  private String outputLocation = null;
 
   RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
@@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
 
-    // default the staging location to the metadata location
+    // default the output location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
     Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
-    this.stagingLocation = metadataFilePath.getParent().toString();
+    this.outputLocation = metadataFilePath.getParent().toString();
 
     // use the current table format version for new manifests
     this.formatVersion = ops.current().formatVersion();
+
+    boolean snapshotIdInheritanceEnabled =
+        PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+    this.shouldStageManifests = formatVersion == 1 && 
!snapshotIdInheritanceEnabled;
   }
 
   @Override
@@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction
 
   @Override
   public RewriteManifestsSparkAction stagingLocation(String 
newStagingLocation) {
-    this.stagingLocation = newStagingLocation;
+    if (shouldStageManifests) {
+      this.outputLocation = newStagingLocation;
+    } else {
+      LOG.warn("Ignoring provided staging location as new manifests will be 
committed directly");
+    }
     return this;
   }
 
   @Override
   public RewriteManifests.Result execute() {
-    String desc =
-        String.format(
-            "Rewriting manifests (staging location=%s) of %s", 
stagingLocation, table.name());
+    String desc = String.format("Rewriting manifests in %s", table.name());
     JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
     return withJobGroupInfo(info, this::doExecute);
   }
@@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction
             toManifests(
                 tableBroadcast,
                 maxNumManifestEntries,
-                stagingLocation,
+                outputLocation,
                 formatVersion,
                 combinedPartitionType,
                 spec,
@@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction
                   toManifests(
                       tableBroadcast,
                       maxNumManifestEntries,
-                      stagingLocation,
+                      outputLocation,
                       formatVersion,
                       combinedPartitionType,
                       spec,
@@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction
   private void replaceManifests(
       Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> 
addedManifests) {
     try {
-      boolean snapshotIdInheritanceEnabled =
-          PropertyUtil.propertyAsBoolean(
-              table.properties(),
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
       org.apache.iceberg.RewriteManifests rewriteManifests = 
table.rewriteManifests();
       deletedManifests.forEach(rewriteManifests::deleteManifest);
       addedManifests.forEach(rewriteManifests::addManifest);
       commit(rewriteManifests);
 
-      if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+      if (shouldStageManifests) {
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index eadcfc4261..a5dd0054da 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files;
 import static org.apache.iceberg.ValidationHelpers.snapshotIds;
 import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase 
{
   private final String snapshotIdInheritanceEnabled;
   private final String useCaching;
   private final int formatVersion;
+  private final boolean shouldStageManifests;
   private String tableLocation = null;
 
   public TestRewriteManifestsAction(
@@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
     this.useCaching = useCaching;
     this.formatVersion = formatVersion;
+    this.shouldStageManifests = formatVersion == 1 && 
snapshotIdInheritanceEnabled.equals("false");
   }
 
   @Before
@@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
       SparkActions actions = SparkActions.get();
 
+      String rewriteStagingLocation = temp.newFolder().toString();
+
       RewriteManifests.Result result =
           actions
               .rewriteManifests(table)
               .rewriteIf(manifest -> true)
               .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-              .stagingLocation(temp.newFolder().toString())
+              .stagingLocation(rewriteStagingLocation)
               .execute();
 
       Assert.assertEquals(
@@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           result.rewrittenManifests());
       Assert.assertEquals(
           "Action should add 1 manifest", 1, 
Iterables.size(result.addedManifests()));
+      assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
 
     } finally {
       spark.sql("DROP TABLE parquet_table");
@@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     RewriteManifests.Result result =
         actions
             .rewriteManifests(table)
             .rewriteIf(manifest -> true)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .execute();
 
     Assert.assertEquals(
         "Action should rewrite 1 manifest", 1, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     // rewrite only the first manifest
     RewriteManifests.Result result =
         actions
@@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
                 manifest ->
                     (manifest.path().equals(manifests.get(0).path())
                         || (manifest.path().equals(manifests.get(1).path()))))
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
@@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifest", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     return totalSize / numEntries;
   }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests) {
+    assertManifestsLocation(manifests, null);
+  }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests, 
String stagingLocation) {
+    if (shouldStageManifests && stagingLocation != null) {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(stagingLocation));
+    } else {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(tableLocation));
+    }
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 87fbe2de2f..bc2ef23067 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p>By default, this action rewrites all manifests for the current partition 
spec and writes the
  * result to the metadata folder. The behavior can be modified by passing a 
custom predicate to
- * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. 
In addition, there is
- * a way to configure a custom location for new manifests via {@link 
#stagingLocation}.
+ * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. 
In addition, there is
+ * a way to configure a custom location for staged manifests via {@link 
#stagingLocation(String)}.
+ * The provided staging location will be ignored if snapshot ID inheritance is 
enabled. In such
+ * cases, the manifests are always written to the metadata folder and 
committed without staging.
  */
 public class RewriteManifestsSparkAction
     extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> 
implements RewriteManifests {
@@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
   private final Table table;
   private final int formatVersion;
   private final long targetManifestSizeBytes;
+  private final boolean shouldStageManifests;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
-  private String stagingLocation = null;
+  private String outputLocation = null;
 
   RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
@@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
 
-    // default the staging location to the metadata location
+    // default the output location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
     Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
-    this.stagingLocation = metadataFilePath.getParent().toString();
+    this.outputLocation = metadataFilePath.getParent().toString();
 
     // use the current table format version for new manifests
     this.formatVersion = ops.current().formatVersion();
+
+    boolean snapshotIdInheritanceEnabled =
+        PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+    this.shouldStageManifests = formatVersion == 1 && 
!snapshotIdInheritanceEnabled;
   }
 
   @Override
@@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction
 
   @Override
   public RewriteManifestsSparkAction stagingLocation(String 
newStagingLocation) {
-    this.stagingLocation = newStagingLocation;
+    if (shouldStageManifests) {
+      this.outputLocation = newStagingLocation;
+    } else {
+      LOG.warn("Ignoring provided staging location as new manifests will be 
committed directly");
+    }
     return this;
   }
 
   @Override
   public RewriteManifests.Result execute() {
-    String desc =
-        String.format(
-            "Rewriting manifests (staging location=%s) of %s", 
stagingLocation, table.name());
+    String desc = String.format("Rewriting manifests in %s", table.name());
     JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
     return withJobGroupInfo(info, this::doExecute);
   }
@@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction
             toManifests(
                 tableBroadcast,
                 maxNumManifestEntries,
-                stagingLocation,
+                outputLocation,
                 formatVersion,
                 combinedPartitionType,
                 spec,
@@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction
                   toManifests(
                       tableBroadcast,
                       maxNumManifestEntries,
-                      stagingLocation,
+                      outputLocation,
                       formatVersion,
                       combinedPartitionType,
                       spec,
@@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction
   private void replaceManifests(
       Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> 
addedManifests) {
     try {
-      boolean snapshotIdInheritanceEnabled =
-          PropertyUtil.propertyAsBoolean(
-              table.properties(),
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
       org.apache.iceberg.RewriteManifests rewriteManifests = 
table.rewriteManifests();
       deletedManifests.forEach(rewriteManifests::deleteManifest);
       addedManifests.forEach(rewriteManifests::addManifest);
       commit(rewriteManifests);
 
-      if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+      if (shouldStageManifests) {
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index eadcfc4261..a5dd0054da 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files;
 import static org.apache.iceberg.ValidationHelpers.snapshotIds;
 import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase 
{
   private final String snapshotIdInheritanceEnabled;
   private final String useCaching;
   private final int formatVersion;
+  private final boolean shouldStageManifests;
   private String tableLocation = null;
 
   public TestRewriteManifestsAction(
@@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
     this.useCaching = useCaching;
     this.formatVersion = formatVersion;
+    this.shouldStageManifests = formatVersion == 1 && 
snapshotIdInheritanceEnabled.equals("false");
   }
 
   @Before
@@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
       SparkActions actions = SparkActions.get();
 
+      String rewriteStagingLocation = temp.newFolder().toString();
+
       RewriteManifests.Result result =
           actions
               .rewriteManifests(table)
               .rewriteIf(manifest -> true)
               .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-              .stagingLocation(temp.newFolder().toString())
+              .stagingLocation(rewriteStagingLocation)
               .execute();
 
       Assert.assertEquals(
@@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           result.rewrittenManifests());
       Assert.assertEquals(
           "Action should add 1 manifest", 1, 
Iterables.size(result.addedManifests()));
+      assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
 
     } finally {
       spark.sql("DROP TABLE parquet_table");
@@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     RewriteManifests.Result result =
         actions
             .rewriteManifests(table)
             .rewriteIf(manifest -> true)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .execute();
 
     Assert.assertEquals(
         "Action should rewrite 1 manifest", 1, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     // rewrite only the first manifest
     RewriteManifests.Result result =
         actions
@@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
                 manifest ->
                     (manifest.path().equals(manifests.get(0).path())
                         || (manifest.path().equals(manifests.get(1).path()))))
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
@@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifest", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     return totalSize / numEntries;
   }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests) {
+    assertManifestsLocation(manifests, null);
+  }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests, 
String stagingLocation) {
+    if (shouldStageManifests && stagingLocation != null) {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(stagingLocation));
+    } else {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(tableLocation));
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 87fbe2de2f..bc2ef23067 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p>By default, this action rewrites all manifests for the current partition 
spec and writes the
  * result to the metadata folder. The behavior can be modified by passing a 
custom predicate to
- * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. 
In addition, there is
- * a way to configure a custom location for new manifests via {@link 
#stagingLocation}.
+ * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. 
In addition, there is
+ * a way to configure a custom location for staged manifests via {@link 
#stagingLocation(String)}.
+ * The provided staging location will be ignored if snapshot ID inheritance is 
enabled. In such
+ * cases, the manifests are always written to the metadata folder and 
committed without staging.
  */
 public class RewriteManifestsSparkAction
     extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> 
implements RewriteManifests {
@@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
   private final Table table;
   private final int formatVersion;
   private final long targetManifestSizeBytes;
+  private final boolean shouldStageManifests;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
-  private String stagingLocation = null;
+  private String outputLocation = null;
 
   RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
@@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
 
-    // default the staging location to the metadata location
+    // default the output location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
     Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
-    this.stagingLocation = metadataFilePath.getParent().toString();
+    this.outputLocation = metadataFilePath.getParent().toString();
 
     // use the current table format version for new manifests
     this.formatVersion = ops.current().formatVersion();
+
+    boolean snapshotIdInheritanceEnabled =
+        PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+    this.shouldStageManifests = formatVersion == 1 && 
!snapshotIdInheritanceEnabled;
   }
 
   @Override
@@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction
 
   @Override
   public RewriteManifestsSparkAction stagingLocation(String 
newStagingLocation) {
-    this.stagingLocation = newStagingLocation;
+    if (shouldStageManifests) {
+      this.outputLocation = newStagingLocation;
+    } else {
+      LOG.warn("Ignoring provided staging location as new manifests will be 
committed directly");
+    }
     return this;
   }
 
   @Override
   public RewriteManifests.Result execute() {
-    String desc =
-        String.format(
-            "Rewriting manifests (staging location=%s) of %s", 
stagingLocation, table.name());
+    String desc = String.format("Rewriting manifests in %s", table.name());
     JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
     return withJobGroupInfo(info, this::doExecute);
   }
@@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction
             toManifests(
                 tableBroadcast,
                 maxNumManifestEntries,
-                stagingLocation,
+                outputLocation,
                 formatVersion,
                 combinedPartitionType,
                 spec,
@@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction
                   toManifests(
                       tableBroadcast,
                       maxNumManifestEntries,
-                      stagingLocation,
+                      outputLocation,
                       formatVersion,
                       combinedPartitionType,
                       spec,
@@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction
   private void replaceManifests(
       Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> 
addedManifests) {
     try {
-      boolean snapshotIdInheritanceEnabled =
-          PropertyUtil.propertyAsBoolean(
-              table.properties(),
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
       org.apache.iceberg.RewriteManifests rewriteManifests = 
table.rewriteManifests();
       deletedManifests.forEach(rewriteManifests::deleteManifest);
       addedManifests.forEach(rewriteManifests::addManifest);
       commit(rewriteManifests);
 
-      if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+      if (shouldStageManifests) {
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 3522decec0..4ce5ba4e9d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files;
 import static org.apache.iceberg.ValidationHelpers.snapshotIds;
 import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase 
{
   private final String snapshotIdInheritanceEnabled;
   private final String useCaching;
   private final int formatVersion;
+  private final boolean shouldStageManifests;
   private String tableLocation = null;
 
   public TestRewriteManifestsAction(
@@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
     this.useCaching = useCaching;
     this.formatVersion = formatVersion;
+    this.shouldStageManifests = formatVersion == 1 && 
snapshotIdInheritanceEnabled.equals("false");
   }
 
   @Before
@@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
       SparkActions actions = SparkActions.get();
 
+      String rewriteStagingLocation = temp.newFolder().toString();
+
       RewriteManifests.Result result =
           actions
               .rewriteManifests(table)
               .rewriteIf(manifest -> true)
               .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-              .stagingLocation(temp.newFolder().toString())
+              .stagingLocation(rewriteStagingLocation)
               .execute();
 
       Assert.assertEquals(
@@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           result.rewrittenManifests());
       Assert.assertEquals(
           "Action should add 1 manifest", 1, 
Iterables.size(result.addedManifests()));
+      assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
 
     } finally {
       spark.sql("DROP TABLE parquet_table");
@@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     RewriteManifests.Result result =
         actions
             .rewriteManifests(table)
             .rewriteIf(manifest -> true)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .execute();
 
     Assert.assertEquals(
         "Action should rewrite 1 manifest", 1, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     // rewrite only the first manifest
     RewriteManifests.Result result =
         actions
@@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
                 manifest ->
                     (manifest.path().equals(manifests.get(0).path())
                         || (manifest.path().equals(manifests.get(1).path()))))
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
@@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifest", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     return totalSize / numEntries;
   }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests) {
+    assertManifestsLocation(manifests, null);
+  }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests, 
String stagingLocation) {
+    if (shouldStageManifests && stagingLocation != null) {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(stagingLocation));
+    } else {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(tableLocation));
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 87fbe2de2f..bc2ef23067 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory;
  *
  * <p>By default, this action rewrites all manifests for the current partition 
spec and writes the
  * result to the metadata folder. The behavior can be modified by passing a 
custom predicate to
- * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. 
In addition, there is
- * a way to configure a custom location for new manifests via {@link 
#stagingLocation}.
+ * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. 
In addition, there is
+ * a way to configure a custom location for staged manifests via {@link 
#stagingLocation(String)}.
+ * The provided staging location will be ignored if snapshot ID inheritance is 
enabled. In such
+ * cases, the manifests are always written to the metadata folder and 
committed without staging.
  */
 public class RewriteManifestsSparkAction
     extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> 
implements RewriteManifests {
@@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction
   private final Table table;
   private final int formatVersion;
   private final long targetManifestSizeBytes;
+  private final boolean shouldStageManifests;
 
   private PartitionSpec spec = null;
   private Predicate<ManifestFile> predicate = manifest -> true;
-  private String stagingLocation = null;
+  private String outputLocation = null;
 
   RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
@@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction
             TableProperties.MANIFEST_TARGET_SIZE_BYTES,
             TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
 
-    // default the staging location to the metadata location
+    // default the output location to the metadata location
     TableOperations ops = ((HasTableOperations) table).operations();
     Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
-    this.stagingLocation = metadataFilePath.getParent().toString();
+    this.outputLocation = metadataFilePath.getParent().toString();
 
     // use the current table format version for new manifests
     this.formatVersion = ops.current().formatVersion();
+
+    boolean snapshotIdInheritanceEnabled =
+        PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+            TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+    this.shouldStageManifests = formatVersion == 1 && 
!snapshotIdInheritanceEnabled;
   }
 
   @Override
@@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction
 
   @Override
   public RewriteManifestsSparkAction stagingLocation(String 
newStagingLocation) {
-    this.stagingLocation = newStagingLocation;
+    if (shouldStageManifests) {
+      this.outputLocation = newStagingLocation;
+    } else {
+      LOG.warn("Ignoring provided staging location as new manifests will be 
committed directly");
+    }
     return this;
   }
 
   @Override
   public RewriteManifests.Result execute() {
-    String desc =
-        String.format(
-            "Rewriting manifests (staging location=%s) of %s", 
stagingLocation, table.name());
+    String desc = String.format("Rewriting manifests in %s", table.name());
     JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc);
     return withJobGroupInfo(info, this::doExecute);
   }
@@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction
             toManifests(
                 tableBroadcast,
                 maxNumManifestEntries,
-                stagingLocation,
+                outputLocation,
                 formatVersion,
                 combinedPartitionType,
                 spec,
@@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction
                   toManifests(
                       tableBroadcast,
                       maxNumManifestEntries,
-                      stagingLocation,
+                      outputLocation,
                       formatVersion,
                       combinedPartitionType,
                       spec,
@@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction
   private void replaceManifests(
       Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> 
addedManifests) {
     try {
-      boolean snapshotIdInheritanceEnabled =
-          PropertyUtil.propertyAsBoolean(
-              table.properties(),
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-              TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
-
       org.apache.iceberg.RewriteManifests rewriteManifests = 
table.rewriteManifests();
       deletedManifests.forEach(rewriteManifests::deleteManifest);
       addedManifests.forEach(rewriteManifests::addManifest);
       commit(rewriteManifests);
 
-      if (formatVersion == 1 && !snapshotIdInheritanceEnabled) {
+      if (shouldStageManifests) {
         // delete new manifests as they were rewritten before the commit
         deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
       }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 3522decec0..4ce5ba4e9d 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files;
 import static org.apache.iceberg.ValidationHelpers.snapshotIds;
 import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
 import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase 
{
   private final String snapshotIdInheritanceEnabled;
   private final String useCaching;
   private final int formatVersion;
+  private final boolean shouldStageManifests;
   private String tableLocation = null;
 
   public TestRewriteManifestsAction(
@@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled;
     this.useCaching = useCaching;
     this.formatVersion = formatVersion;
+    this.shouldStageManifests = formatVersion == 1 && 
snapshotIdInheritanceEnabled.equals("false");
   }
 
   @Before
@@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 4 manifests", 4, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
       SparkActions actions = SparkActions.get();
 
+      String rewriteStagingLocation = temp.newFolder().toString();
+
       RewriteManifests.Result result =
           actions
               .rewriteManifests(table)
               .rewriteIf(manifest -> true)
               .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-              .stagingLocation(temp.newFolder().toString())
+              .stagingLocation(rewriteStagingLocation)
               .execute();
 
       Assert.assertEquals(
@@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
           result.rewrittenManifests());
       Assert.assertEquals(
           "Action should add 1 manifest", 1, 
Iterables.size(result.addedManifests()));
+      assertManifestsLocation(result.addedManifests(), rewriteStagingLocation);
 
     } finally {
       spark.sql("DROP TABLE parquet_table");
@@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     RewriteManifests.Result result =
         actions
             .rewriteManifests(table)
             .rewriteIf(manifest -> true)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .execute();
 
     Assert.assertEquals(
         "Action should rewrite 1 manifest", 1, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     SparkActions actions = SparkActions.get();
 
+    String stagingLocation = temp.newFolder().toString();
+
     // rewrite only the first manifest
     RewriteManifests.Result result =
         actions
@@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
                 manifest ->
                     (manifest.path().equals(manifests.get(0).path())
                         || (manifest.path().equals(manifests.get(1).path()))))
-            .stagingLocation(temp.newFolder().toString())
+            .stagingLocation(stagingLocation)
             .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
             .execute();
 
@@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifest", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
@@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         "Action should rewrite 2 manifests", 2, 
Iterables.size(result.rewrittenManifests()));
     Assert.assertEquals(
         "Action should add 1 manifests", 1, 
Iterables.size(result.addedManifests()));
+    assertManifestsLocation(result.addedManifests());
 
     table.refresh();
 
@@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
 
     return totalSize / numEntries;
   }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests) {
+    assertManifestsLocation(manifests, null);
+  }
+
+  private void assertManifestsLocation(Iterable<ManifestFile> manifests, 
String stagingLocation) {
+    if (shouldStageManifests && stagingLocation != null) {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(stagingLocation));
+    } else {
+      assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(tableLocation));
+    }
+  }
 }


Reply via email to