This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 67e084c8d4 API: Support removeUnusedSpecs in ExpireSnapshots (#10755)
67e084c8d4 is described below
commit 67e084c8d47db68c6135f9837b9cff6f88da0309
Author: advancedxy <[email protected]>
AuthorDate: Wed Jan 8 02:26:46 2025 +0800
API: Support removeUnusedSpecs in ExpireSnapshots (#10755)
Implement an API in ExpireSnapshots to remove partition specs no longer in
use by performing a reachability analysis, so that metadata sizes can be
maintained.
Co-authored-by: Russell_Spitzer <[email protected]>
Co-authored-by: Amogh Jahagirdar <[email protected]>
---
.../java/org/apache/iceberg/ExpireSnapshots.java | 11 ++
.../org/apache/iceberg/IncrementalFileCleanup.java | 12 +--
.../java/org/apache/iceberg/MetadataUpdate.java | 17 +++
.../org/apache/iceberg/MetadataUpdateParser.java | 19 ++++
.../java/org/apache/iceberg/RemoveSnapshots.java | 28 +++++
.../java/org/apache/iceberg/TableMetadata.java | 13 +++
.../org/apache/iceberg/UpdateRequirements.java | 23 +++++
.../apache/iceberg/TestMetadataUpdateParser.java | 21 ++++
.../org/apache/iceberg/TestRemoveSnapshots.java | 85 +++++++++++++++
.../org/apache/iceberg/TestUpdateRequirements.java | 115 +++++++++++++++++++++
.../org/apache/iceberg/catalog/CatalogTests.java | 56 ++++++++++
11 files changed, 394 insertions(+), 6 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
index f6524a1d4f..15a141eb8c 100644
--- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
@@ -118,4 +118,15 @@ public interface ExpireSnapshots extends
PendingUpdate<List<Snapshot>> {
* @return this for method chaining
*/
ExpireSnapshots cleanExpiredFiles(boolean clean);
+
+ /**
+ * Enable cleaning up unused metadata, such as partition specs, schemas, etc.
+ *
+ * @param clean remove unused partition specs, schemas, or other metadata
when true
+ * @return this for method chaining
+ */
+ default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index e1648514ef..77e7ecfdcc 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -256,7 +257,8 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
});
Set<String> filesToDelete =
- findFilesToDelete(manifestsToScan, manifestsToRevert, validIds,
afterExpiration);
+ findFilesToDelete(
+ manifestsToScan, manifestsToRevert, validIds,
beforeExpiration.specsById());
deleteFiles(filesToDelete, "data");
deleteFiles(manifestsToDelete, "manifest");
@@ -273,7 +275,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
Set<ManifestFile> manifestsToScan,
Set<ManifestFile> manifestsToRevert,
Set<Long> validIds,
- TableMetadata current) {
+ Map<Integer, PartitionSpec> specsById) {
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
Tasks.foreach(manifestsToScan)
.retry(3)
@@ -285,8 +287,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
- try (ManifestReader<?> reader =
- ManifestFiles.open(manifest, fileIO, current.specsById())) {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest,
fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer
valid, the data can be
// deleted
@@ -311,8 +312,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
.run(
manifest -> {
// the manifest has deletes, scan it to find files to delete
- try (ManifestReader<?> reader =
- ManifestFiles.open(manifest, fileIO, current.specsById())) {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest,
fileIO, specsById)) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
index ba038c196e..ff008c9d43 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
@@ -175,6 +175,23 @@ public interface MetadataUpdate extends Serializable {
}
}
+ class RemovePartitionSpecs implements MetadataUpdate {
+ private final Set<Integer> specIds;
+
+ public RemovePartitionSpecs(Set<Integer> specIds) {
+ this.specIds = specIds;
+ }
+
+ public Set<Integer> specIds() {
+ return specIds;
+ }
+
+ @Override
+ public void applyTo(TableMetadata.Builder metadataBuilder) {
+ metadataBuilder.removeSpecs(specIds);
+ }
+ }
+
class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
index 8cdfd3c72b..08d4b3398f 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
@@ -59,6 +59,7 @@ public class MetadataUpdateParser {
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS =
"remove-partition-statistics";
+ static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
// AssignUUID
private static final String UUID = "uuid";
@@ -126,6 +127,9 @@ public class MetadataUpdateParser {
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";
+ // RemovePartitionSpecs
+ private static final String SPEC_IDS = "spec-ids";
+
private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
@@ -149,6 +153,7 @@ public class MetadataUpdateParser {
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class,
SET_CURRENT_VIEW_VERSION)
+ .put(MetadataUpdate.RemovePartitionSpecs.class,
REMOVE_PARTITION_SPECS)
.buildOrThrow();
public static String toJson(MetadataUpdate metadataUpdate) {
@@ -241,6 +246,9 @@ public class MetadataUpdateParser {
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
+ case REMOVE_PARTITION_SPECS:
+ writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs)
metadataUpdate, generator);
+ break;
default:
throw new IllegalArgumentException(
String.format(
@@ -312,6 +320,8 @@ public class MetadataUpdateParser {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
+ case REMOVE_PARTITION_SPECS:
+ return readRemovePartitionSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s",
action));
@@ -447,6 +457,11 @@ public class MetadataUpdateParser {
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}
+ private static void writeRemovePartitionSpecs(
+ MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen)
throws IOException {
+ JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
+ }
+
private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
@@ -596,4 +611,8 @@ public class MetadataUpdateParser {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new
MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}
+
+ private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
+ return new
MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 7558ea7d8a..0cc8943341 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -85,6 +86,7 @@ class RemoveSnapshots implements ExpireSnapshots {
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
private Boolean incrementalCleanup;
private boolean specifiedSnapshotId = false;
+ private boolean cleanExpiredMetadata = false;
RemoveSnapshots(TableOperations ops) {
this.ops = ops;
@@ -159,6 +161,12 @@ class RemoveSnapshots implements ExpireSnapshots {
return this;
}
+ @Override
+ public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
+ this.cleanExpiredMetadata = clean;
+ return this;
+ }
+
@Override
public List<Snapshot> apply() {
TableMetadata updated = internalApply();
@@ -209,6 +217,26 @@ class RemoveSnapshots implements ExpireSnapshots {
.forEach(idsToRemove::add);
updatedMetaBuilder.removeSnapshots(idsToRemove);
+ if (cleanExpiredMetadata) {
+ // TODO: Support cleaning expired schema as well.
+ Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
+ reachableSpecs.add(base.defaultSpecId());
+ Tasks.foreach(idsToRetain)
+ .executeWith(planExecutorService)
+ .run(
+ snapshot ->
+ base.snapshot(snapshot).allManifests(ops.io()).stream()
+ .map(ManifestFile::partitionSpecId)
+ .forEach(reachableSpecs::add));
+
+ Set<Integer> specsToRemove =
+ base.specs().stream()
+ .map(PartitionSpec::specId)
+ .filter(specId -> !reachableSpecs.contains(specId))
+ .collect(Collectors.toSet());
+ updatedMetaBuilder.removeSpecs(specsToRemove);
+ }
+
return updatedMetaBuilder.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 19afb7af04..4ba3bdf8d7 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -1138,6 +1138,19 @@ public class TableMetadata implements Serializable {
return this;
}
+ Builder removeSpecs(Iterable<Integer> specIds) {
+ Set<Integer> specIdsToRemove = Sets.newHashSet(specIds);
+ Preconditions.checkArgument(
+ !specIdsToRemove.contains(defaultSpecId), "Cannot remove the default
partition spec");
+
+ this.specs =
+ specs.stream()
+ .filter(s -> !specIdsToRemove.contains(s.specId()))
+ .collect(Collectors.toList());
+ changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
+ return this;
+ }
+
public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
index d92c1a3742..95369d5193 100644
--- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
+++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
@@ -105,6 +105,8 @@ public class UpdateRequirements {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
+ } else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
+ update((MetadataUpdate.RemovePartitionSpecs) update);
}
return this;
@@ -173,6 +175,27 @@ public class UpdateRequirements {
}
}
+ private void update(MetadataUpdate.RemovePartitionSpecs unused) {
+ // require that the default partition spec has not changed
+ if (!setSpecId) {
+ if (base != null && !isReplace) {
+ require(new
UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
+ }
+ this.setSpecId = true;
+ }
+
+ // require that no branches have changed, so that old specs won't be
written.
+ if (base != null && !isReplace) {
+ base.refs()
+ .forEach(
+ (name, ref) -> {
+ if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH))
{
+ require(new UpdateRequirement.AssertRefSnapshotID(name,
ref.snapshotId()));
+ }
+ });
+ }
+ }
+
private List<UpdateRequirement> build() {
return requirements.build();
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
index 79c3761fa8..6a65bf7628 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
@@ -912,6 +912,17 @@ public class TestMetadataUpdateParser {
.isEqualTo(json);
}
+ @Test
+ public void testRemovePartitionSpec() {
+ String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
+ String json =
"{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
+ MetadataUpdate expected = new
MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
+ assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
+ assertThat(MetadataUpdateParser.toJson(expected))
+ .as("Remove partition specs should convert to the correct JSON value")
+ .isEqualTo(json);
+ }
+
public void assertEquals(
String action, MetadataUpdate expectedUpdate, MetadataUpdate
actualUpdate) {
switch (action) {
@@ -1016,6 +1027,11 @@ public class TestMetadataUpdateParser {
(MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
(MetadataUpdate.SetCurrentViewVersion) actualUpdate);
break;
+ case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
+ assertEqualsRemovePartitionSpecs(
+ (MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
+ (MetadataUpdate.RemovePartitionSpecs) actualUpdate);
+ break;
default:
fail("Unrecognized metadata update action: " + action);
}
@@ -1237,6 +1253,11 @@ public class TestMetadataUpdateParser {
assertThat(actual.versionId()).isEqualTo(expected.versionId());
}
+ private static void assertEqualsRemovePartitionSpecs(
+ MetadataUpdate.RemovePartitionSpecs expected,
MetadataUpdate.RemovePartitionSpecs actual) {
+
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
+ }
+
private String createManifestListWithManifestFiles(long snapshotId, Long
parentSnapshotId)
throws IOException {
File manifestList = File.createTempFile("manifests", null, temp.toFile());
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 44bbd069e2..6f0b2aed39 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.puffin.Blob;
@@ -1620,6 +1621,90 @@ public class TestRemoveSnapshots extends TestBase {
assertThat(deletedFiles).isEqualTo(expectedDeletes);
}
+ @TestTemplate
+ public void testRemoveSpecDuringExpiration() {
+ DataFile file =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data-0.parquet")
+ .withPartitionPath("data_bucket=0")
+ .withFileSizeInBytes(10)
+ .withRecordCount(100)
+ .build();
+ table.newAppend().appendFile(file).commit();
+ Snapshot append = table.currentSnapshot();
+ String appendManifest =
+ Iterables.getOnlyElement(
+ table.currentSnapshot().allManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toList()));
+ table.newDelete().deleteFile(file).commit();
+ Snapshot delete = table.currentSnapshot();
+ String deleteManifest =
+ Iterables.getOnlyElement(
+ table.currentSnapshot().allManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toList()));
+
+ table.updateSpec().addField("id_bucket", Expressions.bucket("id",
16)).commit();
+ PartitionSpec idAndDataBucketSpec = table.spec();
+ DataFile bucketFile =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data-0-id-0.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(100)
+ .withPartitionPath("data_bucket=0/id_bucket=0")
+ .build();
+ table.newAppend().appendFile(bucketFile).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ // Expiring snapshots should remove the data_bucket partition
+ removeSnapshots(table)
+ .expireOlderThan(System.currentTimeMillis())
+ .cleanExpiredMetadata(true)
+ .deleteWith(deletedFiles::add)
+ .commit();
+
+ assertThat(deletedFiles)
+ .containsExactlyInAnyOrder(
+ appendManifest,
+ deleteManifest,
+ file.location(),
+ append.manifestListLocation(),
+ delete.manifestListLocation());
+ assertThat(table.specs().keySet())
+ .as("Only id_bucket + data_bucket transform should exist")
+ .containsExactly(idAndDataBucketSpec.specId());
+ }
+
+ @TestTemplate
+ public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException {
+ // The default spec for table is bucketed on data, but write using
unpartitioned
+ PartitionSpec dataBucketSpec = table.spec();
+ DataFile file =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-0.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(100)
+ .build();
+
+ table.newAppend().appendFile(file).commit();
+ Snapshot append = table.currentSnapshot();
+ table.newDelete().deleteFile(file).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ // Expiring snapshots should remove only the unpartitioned spec
+ removeSnapshots(table)
+ .expireOlderThan(System.currentTimeMillis())
+ .cleanExpiredMetadata(true)
+ .deleteWith(deletedFiles::add)
+ .commit();
+
+
assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation());
+ assertThat(table.specs().keySet())
+ .as("Only data_bucket transform should exist")
+ .containsExactly(dataBucketSpec.specId());
+ }
+
private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
return
snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
}
diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
index e5b3428508..7d8cc471ba 100644
--- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
+++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
@@ -424,6 +424,121 @@ public class TestUpdateRequirements {
.hasMessage("Requirement failed: default partition spec changed:
expected id 0 != 1");
}
+ @Test
+ public void removePartitionSpec() {
+ int defaultSpecId = 3;
+ when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+
+ List<UpdateRequirement> requirements =
+ UpdateRequirements.forUpdateTable(
+ metadata,
+ ImmutableList.of(new
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+ requirements.forEach(req -> req.validate(metadata));
+
+ assertThat(requirements)
+ .hasSize(2)
+ .hasOnlyElementsOfTypes(
+ UpdateRequirement.AssertTableUUID.class,
UpdateRequirement.AssertDefaultSpecID.class);
+
+ assertTableUUID(requirements);
+
+ assertThat(requirements)
+ .element(1)
+
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class))
+ .extracting(UpdateRequirement.AssertDefaultSpecID::specId)
+ .isEqualTo(defaultSpecId);
+ }
+
+ @Test
+ public void testRemovePartitionSpecsWithBranch() {
+ int defaultSpecId = 3;
+ long snapshotId = 42L;
+ when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+
+ String branch = "branch";
+ SnapshotRef snapshotRef = mock(SnapshotRef.class);
+ when(snapshotRef.snapshotId()).thenReturn(snapshotId);
+ when(snapshotRef.isBranch()).thenReturn(true);
+ when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef));
+ when(metadata.ref(branch)).thenReturn(snapshotRef);
+
+ List<UpdateRequirement> requirements =
+ UpdateRequirements.forUpdateTable(
+ metadata,
+ ImmutableList.of(new
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+ requirements.forEach(req -> req.validate(metadata));
+
+ assertThat(requirements)
+ .hasSize(3)
+ .hasOnlyElementsOfTypes(
+ UpdateRequirement.AssertTableUUID.class,
+ UpdateRequirement.AssertDefaultSpecID.class,
+ UpdateRequirement.AssertRefSnapshotID.class);
+
+ assertTableUUID(requirements);
+
+ assertThat(requirements)
+ .element(1)
+
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class))
+ .extracting(UpdateRequirement.AssertDefaultSpecID::specId)
+ .isEqualTo(defaultSpecId);
+
+ assertThat(requirements)
+ .element(2)
+
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class))
+ .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId)
+ .isEqualTo(snapshotId);
+ }
+
+ @Test
+ public void testRemovePartitionSpecsFailure() {
+ int defaultSpecId = 3;
+ when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+ when(updated.defaultSpecId()).thenReturn(defaultSpecId + 1);
+
+ List<UpdateRequirement> requirements =
+ UpdateRequirements.forUpdateTable(
+ metadata,
+ ImmutableList.of(new
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+
+ assertThatThrownBy(() -> requirements.forEach(req ->
req.validate(updated)))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessage(
+ "Requirement failed: default partition spec changed: expected id
%s != %s",
+ defaultSpecId, defaultSpecId + 1);
+ }
+
+ @Test
+ public void testRemovePartitionSpecsWithBranchFailure() {
+ int defaultSpecId = 3;
+ long snapshotId = 42L;
+ when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+ when(updated.defaultSpecId()).thenReturn(defaultSpecId);
+
+ String branch = "test";
+ SnapshotRef snapshotRef = mock(SnapshotRef.class);
+ when(snapshotRef.snapshotId()).thenReturn(snapshotId);
+ when(snapshotRef.isBranch()).thenReturn(true);
+ when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef));
+ when(metadata.ref(branch)).thenReturn(snapshotRef);
+
+ SnapshotRef updatedRef = mock(SnapshotRef.class);
+ when(updatedRef.snapshotId()).thenReturn(snapshotId + 1);
+ when(updatedRef.isBranch()).thenReturn(true);
+ when(updated.ref(branch)).thenReturn(updatedRef);
+
+ List<UpdateRequirement> requirements =
+ UpdateRequirements.forUpdateTable(
+ metadata,
+ ImmutableList.of(new
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+
+ assertThatThrownBy(() -> requirements.forEach(req ->
req.validate(updated)))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessage(
+ "Requirement failed: branch %s has changed: expected id %s != %s",
+ branch, snapshotId, snapshotId + 1);
+ }
+
@Test
public void addSortOrder() {
List<UpdateRequirement> requirements =
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index b1eef4c015..c5e91a6a9e 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -1293,6 +1293,62 @@ public abstract class CatalogTests<C extends Catalog &
SupportsNamespaces> {
assertThat(table.spec()).as("Loaded table should have expected
spec").isEqualTo(TABLE_SPEC);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRemoveUnusedSpec(boolean withBranch) {
+ String branch = "test";
+ C catalog = catalog();
+
+ if (requiresNamespaceCreate()) {
+ catalog.createNamespace(NS);
+ }
+
+ Table table =
+ catalog
+ .buildTable(TABLE, SCHEMA)
+ .withPartitionSpec(SPEC)
+ .withProperty(TableProperties.GC_ENABLED, "true")
+ .create();
+ PartitionSpec spec = table.spec();
+ // added a file to trigger snapshot expiration
+ table.newFastAppend().appendFile(FILE_A).commit();
+ if (withBranch) {
+ table.manageSnapshots().createBranch(branch).commit();
+ }
+ table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+ table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+ table.updateSpec().addField("data").commit();
+ assertThat(table.specs()).as("Should have 3 total specs").hasSize(3);
+ PartitionSpec current = table.spec();
+ table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+ Table loaded = catalog.loadTable(TABLE);
+ assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec,
current);
+
+ // add a data file with current spec and remove the old data file
+ table.newDelete().deleteFile(FILE_A).commit();
+ DataFile anotherFile =
+ DataFiles.builder(current)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id_bucket=0/data=123") // easy way to set
partition data for now
+ .withRecordCount(2) // needs at least one record or else metrics
will filter it out
+ .build();
+ table.newAppend().appendFile(anotherFile).commit();
+ table
+ .expireSnapshots()
+ .cleanExpiredFiles(false)
+ .expireOlderThan(table.currentSnapshot().timestampMillis())
+ .cleanExpiredMetadata(true)
+ .commit();
+ loaded = catalog.loadTable(TABLE);
+ if (withBranch) {
+ assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec,
current);
+ } else {
+ assertThat(loaded.specs().values()).containsExactlyInAnyOrder(current);
+ }
+ }
+
@Test
public void testUpdateTableSortOrder() {
C catalog = catalog();