This is an automated email from the ASF dual-hosted git repository. aokolnychyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 2c890109c1 Spark: Fix usage of staging location when optimizing metadata (#8959) 2c890109c1 is described below commit 2c890109c17bfb971490cab007be23029b81bad8 Author: Anton Okolnychyi <aokolnyc...@apple.com> AuthorDate: Thu Nov 2 20:39:45 2023 -0700 Spark: Fix usage of staging location when optimizing metadata (#8959) --- .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++---------- .../spark/actions/TestRewriteManifestsAction.java | 33 +++++++++++++++-- .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++---------- .../spark/actions/TestRewriteManifestsAction.java | 33 +++++++++++++++-- .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++---------- .../spark/actions/TestRewriteManifestsAction.java | 33 +++++++++++++++-- .../spark/actions/RewriteManifestsSparkAction.java | 42 ++++++++++++---------- .../spark/actions/TestRewriteManifestsAction.java | 33 +++++++++++++++-- 8 files changed, 216 insertions(+), 84 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 854232a62d..dd287ea193 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory; * * <p>By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate<ManifestFile> predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction private void replaceManifests( Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index eadcfc4261..a5dd0054da 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files; import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends SparkTestBase { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2f..bc2ef23067 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory; * * <p>By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate<ManifestFile> predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction private void replaceManifests( Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index eadcfc4261..a5dd0054da 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files; import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends SparkTestBase { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2f..bc2ef23067 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory; * * <p>By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate<ManifestFile> predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction private void replaceManifests( Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 3522decec0..4ce5ba4e9d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files; import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends SparkTestBase { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 87fbe2de2f..bc2ef23067 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -73,8 +73,10 @@ import org.slf4j.LoggerFactory; * * <p>By default, this action rewrites all manifests for the current partition spec and writes the * result to the metadata folder. The behavior can be modified by passing a custom predicate to - * {@link #rewriteIf(Predicate)} and a custom spec id to {@link #specId(int)}. In addition, there is - * a way to configure a custom location for new manifests via {@link #stagingLocation}. + * {@link #rewriteIf(Predicate)} and a custom spec ID to {@link #specId(int)}. In addition, there is + * a way to configure a custom location for staged manifests via {@link #stagingLocation(String)}. + * The provided staging location will be ignored if snapshot ID inheritance is enabled. In such + * cases, the manifests are always written to the metadata folder and committed without staging. */ public class RewriteManifestsSparkAction extends BaseSnapshotUpdateSparkAction<RewriteManifestsSparkAction> implements RewriteManifests { @@ -88,10 +90,11 @@ public class RewriteManifestsSparkAction private final Table table; private final int formatVersion; private final long targetManifestSizeBytes; + private final boolean shouldStageManifests; private PartitionSpec spec = null; private Predicate<ManifestFile> predicate = manifest -> true; - private String stagingLocation = null; + private String outputLocation = null; RewriteManifestsSparkAction(SparkSession spark, Table table) { super(spark); @@ -104,13 +107,20 @@ public class RewriteManifestsSparkAction TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); - // default the staging location to the metadata location + // default the output location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); Path metadataFilePath = new Path(ops.metadataFileLocation("file")); - this.stagingLocation = metadataFilePath.getParent().toString(); + this.outputLocation = metadataFilePath.getParent().toString(); // use the current table format version for new manifests this.formatVersion = ops.current().formatVersion(); + + boolean snapshotIdInheritanceEnabled = + PropertyUtil.propertyAsBoolean( + table.properties(), + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); + this.shouldStageManifests = formatVersion == 1 && !snapshotIdInheritanceEnabled; } @Override @@ -133,15 +143,17 @@ public class RewriteManifestsSparkAction @Override public RewriteManifestsSparkAction stagingLocation(String newStagingLocation) { - this.stagingLocation = newStagingLocation; + if (shouldStageManifests) { + this.outputLocation = newStagingLocation; + } else { + LOG.warn("Ignoring provided staging location as new manifests will be committed directly"); + } return this; } @Override public RewriteManifests.Result execute() { - String desc = - String.format( - "Rewriting manifests (staging location=%s) of %s", stagingLocation, table.name()); + String desc = String.format("Rewriting manifests in %s", table.name()); JobGroupInfo info = newJobGroupInfo("REWRITE-MANIFESTS", desc); return withJobGroupInfo(info, this::doExecute); } @@ -236,7 +248,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -267,7 +279,7 @@ public class RewriteManifestsSparkAction toManifests( tableBroadcast, maxNumManifestEntries, - stagingLocation, + outputLocation, formatVersion, combinedPartitionType, spec, @@ -320,18 +332,12 @@ public class RewriteManifestsSparkAction private void replaceManifests( Iterable<ManifestFile> deletedManifests, Iterable<ManifestFile> addedManifests) { try { - boolean snapshotIdInheritanceEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, - TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); - org.apache.iceberg.RewriteManifests rewriteManifests = table.rewriteManifests(); deletedManifests.forEach(rewriteManifests::deleteManifest); addedManifests.forEach(rewriteManifests::addManifest); commit(rewriteManifests); - if (formatVersion == 1 && !snapshotIdInheritanceEnabled) { + if (shouldStageManifests) { // delete new manifests as they were rewritten before the commit deleteFiles(Iterables.transform(addedManifests, ManifestFile::path)); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 3522decec0..4ce5ba4e9d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.ValidationHelpers.files; import static org.apache.iceberg.ValidationHelpers.snapshotIds; import static org.apache.iceberg.ValidationHelpers.validateDataManifest; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -92,6 +93,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { private final String snapshotIdInheritanceEnabled; private final String useCaching; private final int formatVersion; + private final boolean shouldStageManifests; private String tableLocation = null; public TestRewriteManifestsAction( @@ -99,6 +101,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { this.snapshotIdInheritanceEnabled = snapshotIdInheritanceEnabled; this.useCaching = useCaching; this.formatVersion = formatVersion; + this.shouldStageManifests = formatVersion == 1 && snapshotIdInheritanceEnabled.equals("false"); } @Before @@ -166,6 +169,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -312,6 +316,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 4 manifests", 4, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -376,12 +381,14 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String rewriteStagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(rewriteStagingLocation) .execute(); Assert.assertEquals( @@ -390,6 +397,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { result.rewrittenManifests()); Assert.assertEquals( "Action should add 1 manifest", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), rewriteStagingLocation); } finally { spark.sql("DROP TABLE parquet_table"); @@ -428,18 +436,21 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + RewriteManifests.Result result = actions .rewriteManifests(table) .rewriteIf(manifest -> true) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .execute(); Assert.assertEquals( "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 2 manifests", 2, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -481,6 +492,8 @@ public class TestRewriteManifestsAction extends SparkTestBase { SparkActions actions = SparkActions.get(); + String stagingLocation = temp.newFolder().toString(); + // rewrite only the first manifest RewriteManifests.Result result = actions @@ -489,7 +502,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { manifest -> (manifest.path().equals(manifests.get(0).path()) || (manifest.path().equals(manifests.get(1).path())))) - .stagingLocation(temp.newFolder().toString()) + .stagingLocation(stagingLocation) .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) .execute(); @@ -497,6 +510,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests(), stagingLocation); table.refresh(); @@ -560,6 +574,7 @@ public class TestRewriteManifestsAction extends SparkTestBase { "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); + assertManifestsLocation(result.addedManifests()); table.refresh(); @@ -616,4 +631,16 @@ public class TestRewriteManifestsAction extends SparkTestBase { return totalSize / numEntries; } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests) { + assertManifestsLocation(manifests, null); + } + + private void assertManifestsLocation(Iterable<ManifestFile> manifests, String stagingLocation) { + if (shouldStageManifests && stagingLocation != null) { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(stagingLocation)); + } else { + assertThat(manifests).allMatch(manifest -> manifest.path().startsWith(tableLocation)); + } + } }