This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 67e084c8d4 API: Support removeUnusedSpecs in ExpireSnapshots (#10755)
67e084c8d4 is described below

commit 67e084c8d47db68c6135f9837b9cff6f88da0309
Author: advancedxy <[email protected]>
AuthorDate: Wed Jan 8 02:26:46 2025 +0800

    API: Support removeUnusedSpecs in ExpireSnapshots (#10755)
    
    Implement an API in ExpireSnapshots to remove partition specs no longer in 
use by performing a reachability analysis, so that metadata sizes can be 
maintained.
    
    Co-authored-by: Russell_Spitzer <[email protected]>
    Co-authored-by: Amogh Jahagirdar <[email protected]>
---
 .../java/org/apache/iceberg/ExpireSnapshots.java   |  11 ++
 .../org/apache/iceberg/IncrementalFileCleanup.java |  12 +--
 .../java/org/apache/iceberg/MetadataUpdate.java    |  17 +++
 .../org/apache/iceberg/MetadataUpdateParser.java   |  19 ++++
 .../java/org/apache/iceberg/RemoveSnapshots.java   |  28 +++++
 .../java/org/apache/iceberg/TableMetadata.java     |  13 +++
 .../org/apache/iceberg/UpdateRequirements.java     |  23 +++++
 .../apache/iceberg/TestMetadataUpdateParser.java   |  21 ++++
 .../org/apache/iceberg/TestRemoveSnapshots.java    |  85 +++++++++++++++
 .../org/apache/iceberg/TestUpdateRequirements.java | 115 +++++++++++++++++++++
 .../org/apache/iceberg/catalog/CatalogTests.java   |  56 ++++++++++
 11 files changed, 394 insertions(+), 6 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java 
b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
index f6524a1d4f..15a141eb8c 100644
--- a/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
+++ b/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
@@ -118,4 +118,15 @@ public interface ExpireSnapshots extends 
PendingUpdate<List<Snapshot>> {
    * @return this for method chaining
    */
   ExpireSnapshots cleanExpiredFiles(boolean clean);
+
+  /**
+   * Enable cleaning up unused metadata, such as partition specs, schemas, etc.
+   *
+   * @param clean remove unused partition specs, schemas, or other metadata 
when true
+   * @return this for method chaining
+   */
+  default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java 
b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index e1648514ef..77e7ecfdcc 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -256,7 +257,8 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
             });
 
     Set<String> filesToDelete =
-        findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, 
afterExpiration);
+        findFilesToDelete(
+            manifestsToScan, manifestsToRevert, validIds, 
beforeExpiration.specsById());
 
     deleteFiles(filesToDelete, "data");
     deleteFiles(manifestsToDelete, "manifest");
@@ -273,7 +275,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
       Set<ManifestFile> manifestsToScan,
       Set<ManifestFile> manifestsToRevert,
       Set<Long> validIds,
-      TableMetadata current) {
+      Map<Integer, PartitionSpec> specsById) {
     Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
     Tasks.foreach(manifestsToScan)
         .retry(3)
@@ -285,8 +287,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
         .run(
             manifest -> {
               // the manifest has deletes, scan it to find files to delete
-              try (ManifestReader<?> reader =
-                  ManifestFiles.open(manifest, fileIO, current.specsById())) {
+              try (ManifestReader<?> reader = ManifestFiles.open(manifest, 
fileIO, specsById)) {
                 for (ManifestEntry<?> entry : reader.entries()) {
                   // if the snapshot ID of the DELETE entry is no longer 
valid, the data can be
                   // deleted
@@ -311,8 +312,7 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
         .run(
             manifest -> {
               // the manifest has deletes, scan it to find files to delete
-              try (ManifestReader<?> reader =
-                  ManifestFiles.open(manifest, fileIO, current.specsById())) {
+              try (ManifestReader<?> reader = ManifestFiles.open(manifest, 
fileIO, specsById)) {
                 for (ManifestEntry<?> entry : reader.entries()) {
                   // delete any ADDED file from manifests that were reverted
                   if (entry.status() == ManifestEntry.Status.ADDED) {
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java 
b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
index ba038c196e..ff008c9d43 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java
@@ -175,6 +175,23 @@ public interface MetadataUpdate extends Serializable {
     }
   }
 
+  class RemovePartitionSpecs implements MetadataUpdate {
+    private final Set<Integer> specIds;
+
+    public RemovePartitionSpecs(Set<Integer> specIds) {
+      this.specIds = specIds;
+    }
+
+    public Set<Integer> specIds() {
+      return specIds;
+    }
+
+    @Override
+    public void applyTo(TableMetadata.Builder metadataBuilder) {
+      metadataBuilder.removeSpecs(specIds);
+    }
+  }
+
   class AddSortOrder implements MetadataUpdate {
     private final UnboundSortOrder sortOrder;
 
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java 
b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
index 8cdfd3c72b..08d4b3398f 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
@@ -59,6 +59,7 @@ public class MetadataUpdateParser {
   static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
   static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
   static final String REMOVE_PARTITION_STATISTICS = 
"remove-partition-statistics";
+  static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
 
   // AssignUUID
   private static final String UUID = "uuid";
@@ -126,6 +127,9 @@ public class MetadataUpdateParser {
   // SetCurrentViewVersion
   private static final String VIEW_VERSION_ID = "view-version-id";
 
+  // RemovePartitionSpecs
+  private static final String SPEC_IDS = "spec-ids";
+
   private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
       ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
           .put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
@@ -149,6 +153,7 @@ public class MetadataUpdateParser {
           .put(MetadataUpdate.SetLocation.class, SET_LOCATION)
           .put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
           .put(MetadataUpdate.SetCurrentViewVersion.class, 
SET_CURRENT_VIEW_VERSION)
+          .put(MetadataUpdate.RemovePartitionSpecs.class, 
REMOVE_PARTITION_SPECS)
           .buildOrThrow();
 
   public static String toJson(MetadataUpdate metadataUpdate) {
@@ -241,6 +246,9 @@ public class MetadataUpdateParser {
         writeSetCurrentViewVersionId(
             (MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
         break;
+      case REMOVE_PARTITION_SPECS:
+        writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) 
metadataUpdate, generator);
+        break;
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -312,6 +320,8 @@ public class MetadataUpdateParser {
         return readAddViewVersion(jsonNode);
       case SET_CURRENT_VIEW_VERSION:
         return readCurrentViewVersionId(jsonNode);
+      case REMOVE_PARTITION_SPECS:
+        return readRemovePartitionSpecs(jsonNode);
       default:
         throw new UnsupportedOperationException(
             String.format("Cannot convert metadata update action to json: %s", 
action));
@@ -447,6 +457,11 @@ public class MetadataUpdateParser {
     gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
   }
 
+  private static void writeRemovePartitionSpecs(
+      MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) 
throws IOException {
+    JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
+  }
+
   private static MetadataUpdate readAssignUUID(JsonNode node) {
     String uuid = JsonUtil.getString(UUID, node);
     return new MetadataUpdate.AssignUUID(uuid);
@@ -596,4 +611,8 @@ public class MetadataUpdateParser {
   private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
     return new 
MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
   }
+
+  private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
+    return new 
MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java 
b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 7558ea7d8a..0cc8943341 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -85,6 +86,7 @@ class RemoveSnapshots implements ExpireSnapshots {
   private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
   private Boolean incrementalCleanup;
   private boolean specifiedSnapshotId = false;
+  private boolean cleanExpiredMetadata = false;
 
   RemoveSnapshots(TableOperations ops) {
     this.ops = ops;
@@ -159,6 +161,12 @@ class RemoveSnapshots implements ExpireSnapshots {
     return this;
   }
 
+  @Override
+  public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
+    this.cleanExpiredMetadata = clean;
+    return this;
+  }
+
   @Override
   public List<Snapshot> apply() {
     TableMetadata updated = internalApply();
@@ -209,6 +217,26 @@ class RemoveSnapshots implements ExpireSnapshots {
         .forEach(idsToRemove::add);
     updatedMetaBuilder.removeSnapshots(idsToRemove);
 
+    if (cleanExpiredMetadata) {
+      // TODO: Support cleaning expired schema as well.
+      Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
+      reachableSpecs.add(base.defaultSpecId());
+      Tasks.foreach(idsToRetain)
+          .executeWith(planExecutorService)
+          .run(
+              snapshot ->
+                  base.snapshot(snapshot).allManifests(ops.io()).stream()
+                      .map(ManifestFile::partitionSpecId)
+                      .forEach(reachableSpecs::add));
+
+      Set<Integer> specsToRemove =
+          base.specs().stream()
+              .map(PartitionSpec::specId)
+              .filter(specId -> !reachableSpecs.contains(specId))
+              .collect(Collectors.toSet());
+      updatedMetaBuilder.removeSpecs(specsToRemove);
+    }
+
     return updatedMetaBuilder.build();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 19afb7af04..4ba3bdf8d7 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -1138,6 +1138,19 @@ public class TableMetadata implements Serializable {
       return this;
     }
 
+    Builder removeSpecs(Iterable<Integer> specIds) {
+      Set<Integer> specIdsToRemove = Sets.newHashSet(specIds);
+      Preconditions.checkArgument(
+          !specIdsToRemove.contains(defaultSpecId), "Cannot remove the default 
partition spec");
+
+      this.specs =
+          specs.stream()
+              .filter(s -> !specIdsToRemove.contains(s.specId()))
+              .collect(Collectors.toList());
+      changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
+      return this;
+    }
+
     public Builder addPartitionSpec(UnboundPartitionSpec spec) {
       addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
       return this;
diff --git a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java 
b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
index d92c1a3742..95369d5193 100644
--- a/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
+++ b/core/src/main/java/org/apache/iceberg/UpdateRequirements.java
@@ -105,6 +105,8 @@ public class UpdateRequirements {
         update((MetadataUpdate.SetDefaultPartitionSpec) update);
       } else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
         update((MetadataUpdate.SetDefaultSortOrder) update);
+      } else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
+        update((MetadataUpdate.RemovePartitionSpecs) update);
       }
 
       return this;
@@ -173,6 +175,27 @@ public class UpdateRequirements {
       }
     }
 
+    private void update(MetadataUpdate.RemovePartitionSpecs unused) {
+      // require that the default partition spec has not changed
+      if (!setSpecId) {
+        if (base != null && !isReplace) {
+          require(new 
UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
+        }
+        this.setSpecId = true;
+      }
+
+      // require that no branches have changed, so that old specs won't be 
written.
+      if (base != null && !isReplace) {
+        base.refs()
+            .forEach(
+                (name, ref) -> {
+                  if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) 
{
+                    require(new UpdateRequirement.AssertRefSnapshotID(name, 
ref.snapshotId()));
+                  }
+                });
+      }
+    }
+
     private List<UpdateRequirement> build() {
       return requirements.build();
     }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
index 79c3761fa8..6a65bf7628 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
@@ -912,6 +912,17 @@ public class TestMetadataUpdateParser {
         .isEqualTo(json);
   }
 
+  @Test
+  public void testRemovePartitionSpec() {
+    String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
+    String json = 
"{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
+    MetadataUpdate expected = new 
MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
+    assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
+    assertThat(MetadataUpdateParser.toJson(expected))
+        .as("Remove partition specs should convert to the correct JSON value")
+        .isEqualTo(json);
+  }
+
   public void assertEquals(
       String action, MetadataUpdate expectedUpdate, MetadataUpdate 
actualUpdate) {
     switch (action) {
@@ -1016,6 +1027,11 @@ public class TestMetadataUpdateParser {
             (MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
             (MetadataUpdate.SetCurrentViewVersion) actualUpdate);
         break;
+      case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
+        assertEqualsRemovePartitionSpecs(
+            (MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
+            (MetadataUpdate.RemovePartitionSpecs) actualUpdate);
+        break;
       default:
         fail("Unrecognized metadata update action: " + action);
     }
@@ -1237,6 +1253,11 @@ public class TestMetadataUpdateParser {
     assertThat(actual.versionId()).isEqualTo(expected.versionId());
   }
 
+  private static void assertEqualsRemovePartitionSpecs(
+      MetadataUpdate.RemovePartitionSpecs expected, 
MetadataUpdate.RemovePartitionSpecs actual) {
+    
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
+  }
+
   private String createManifestListWithManifestFiles(long snapshotId, Long 
parentSnapshotId)
       throws IOException {
     File manifestList = File.createTempFile("manifests", null, temp.toFile());
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java 
b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 44bbd069e2..6f0b2aed39 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.PositionOutputStream;
 import org.apache.iceberg.puffin.Blob;
@@ -1620,6 +1621,90 @@ public class TestRemoveSnapshots extends TestBase {
     assertThat(deletedFiles).isEqualTo(expectedDeletes);
   }
 
+  @TestTemplate
+  public void testRemoveSpecDuringExpiration() {
+    DataFile file =
+        DataFiles.builder(table.spec())
+            .withPath("/path/to/data-0.parquet")
+            .withPartitionPath("data_bucket=0")
+            .withFileSizeInBytes(10)
+            .withRecordCount(100)
+            .build();
+    table.newAppend().appendFile(file).commit();
+    Snapshot append = table.currentSnapshot();
+    String appendManifest =
+        Iterables.getOnlyElement(
+            table.currentSnapshot().allManifests(table.io()).stream()
+                .map(ManifestFile::path)
+                .collect(Collectors.toList()));
+    table.newDelete().deleteFile(file).commit();
+    Snapshot delete = table.currentSnapshot();
+    String deleteManifest =
+        Iterables.getOnlyElement(
+            table.currentSnapshot().allManifests(table.io()).stream()
+                .map(ManifestFile::path)
+                .collect(Collectors.toList()));
+
+    table.updateSpec().addField("id_bucket", Expressions.bucket("id", 
16)).commit();
+    PartitionSpec idAndDataBucketSpec = table.spec();
+    DataFile bucketFile =
+        DataFiles.builder(table.spec())
+            .withPath("/path/to/data-0-id-0.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(100)
+            .withPartitionPath("data_bucket=0/id_bucket=0")
+            .build();
+    table.newAppend().appendFile(bucketFile).commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    // Expiring snapshots should remove the data_bucket partition
+    removeSnapshots(table)
+        .expireOlderThan(System.currentTimeMillis())
+        .cleanExpiredMetadata(true)
+        .deleteWith(deletedFiles::add)
+        .commit();
+
+    assertThat(deletedFiles)
+        .containsExactlyInAnyOrder(
+            appendManifest,
+            deleteManifest,
+            file.location(),
+            append.manifestListLocation(),
+            delete.manifestListLocation());
+    assertThat(table.specs().keySet())
+        .as("Only id_bucket + data_bucket transform should exist")
+        .containsExactly(idAndDataBucketSpec.specId());
+  }
+
+  @TestTemplate
+  public void testRemoveSpecsDoesntRemoveDefaultSpec() throws IOException {
+    // The default spec for table is bucketed on data, but write using 
unpartitioned
+    PartitionSpec dataBucketSpec = table.spec();
+    DataFile file =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath("/path/to/data-0.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(100)
+            .build();
+
+    table.newAppend().appendFile(file).commit();
+    Snapshot append = table.currentSnapshot();
+    table.newDelete().deleteFile(file).commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    // Expiring snapshots should remove only the unpartitioned spec
+    removeSnapshots(table)
+        .expireOlderThan(System.currentTimeMillis())
+        .cleanExpiredMetadata(true)
+        .deleteWith(deletedFiles::add)
+        .commit();
+
+    
assertThat(deletedFiles).containsExactlyInAnyOrder(append.manifestListLocation());
+    assertThat(table.specs().keySet())
+        .as("Only data_bucket transform should exist")
+        .containsExactly(dataBucketSpec.specId());
+  }
+
   private Set<String> manifestPaths(Snapshot snapshot, FileIO io) {
     return 
snapshot.allManifests(io).stream().map(ManifestFile::path).collect(Collectors.toSet());
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java 
b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
index e5b3428508..7d8cc471ba 100644
--- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
+++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java
@@ -424,6 +424,121 @@ public class TestUpdateRequirements {
         .hasMessage("Requirement failed: default partition spec changed: 
expected id 0 != 1");
   }
 
+  @Test
+  public void removePartitionSpec() {
+    int defaultSpecId = 3;
+    when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+
+    List<UpdateRequirement> requirements =
+        UpdateRequirements.forUpdateTable(
+            metadata,
+            ImmutableList.of(new 
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+    requirements.forEach(req -> req.validate(metadata));
+
+    assertThat(requirements)
+        .hasSize(2)
+        .hasOnlyElementsOfTypes(
+            UpdateRequirement.AssertTableUUID.class, 
UpdateRequirement.AssertDefaultSpecID.class);
+
+    assertTableUUID(requirements);
+
+    assertThat(requirements)
+        .element(1)
+        
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class))
+        .extracting(UpdateRequirement.AssertDefaultSpecID::specId)
+        .isEqualTo(defaultSpecId);
+  }
+
+  @Test
+  public void testRemovePartitionSpecsWithBranch() {
+    int defaultSpecId = 3;
+    long snapshotId = 42L;
+    when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+
+    String branch = "branch";
+    SnapshotRef snapshotRef = mock(SnapshotRef.class);
+    when(snapshotRef.snapshotId()).thenReturn(snapshotId);
+    when(snapshotRef.isBranch()).thenReturn(true);
+    when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef));
+    when(metadata.ref(branch)).thenReturn(snapshotRef);
+
+    List<UpdateRequirement> requirements =
+        UpdateRequirements.forUpdateTable(
+            metadata,
+            ImmutableList.of(new 
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+    requirements.forEach(req -> req.validate(metadata));
+
+    assertThat(requirements)
+        .hasSize(3)
+        .hasOnlyElementsOfTypes(
+            UpdateRequirement.AssertTableUUID.class,
+            UpdateRequirement.AssertDefaultSpecID.class,
+            UpdateRequirement.AssertRefSnapshotID.class);
+
+    assertTableUUID(requirements);
+
+    assertThat(requirements)
+        .element(1)
+        
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertDefaultSpecID.class))
+        .extracting(UpdateRequirement.AssertDefaultSpecID::specId)
+        .isEqualTo(defaultSpecId);
+
+    assertThat(requirements)
+        .element(2)
+        
.asInstanceOf(InstanceOfAssertFactories.type(UpdateRequirement.AssertRefSnapshotID.class))
+        .extracting(UpdateRequirement.AssertRefSnapshotID::snapshotId)
+        .isEqualTo(snapshotId);
+  }
+
+  @Test
+  public void testRemovePartitionSpecsFailure() {
+    int defaultSpecId = 3;
+    when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+    when(updated.defaultSpecId()).thenReturn(defaultSpecId + 1);
+
+    List<UpdateRequirement> requirements =
+        UpdateRequirements.forUpdateTable(
+            metadata,
+            ImmutableList.of(new 
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+
+    assertThatThrownBy(() -> requirements.forEach(req -> 
req.validate(updated)))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage(
+            "Requirement failed: default partition spec changed: expected id 
%s != %s",
+            defaultSpecId, defaultSpecId + 1);
+  }
+
+  @Test
+  public void testRemovePartitionSpecsWithBranchFailure() {
+    int defaultSpecId = 3;
+    long snapshotId = 42L;
+    when(metadata.defaultSpecId()).thenReturn(defaultSpecId);
+    when(updated.defaultSpecId()).thenReturn(defaultSpecId);
+
+    String branch = "test";
+    SnapshotRef snapshotRef = mock(SnapshotRef.class);
+    when(snapshotRef.snapshotId()).thenReturn(snapshotId);
+    when(snapshotRef.isBranch()).thenReturn(true);
+    when(metadata.refs()).thenReturn(ImmutableMap.of(branch, snapshotRef));
+    when(metadata.ref(branch)).thenReturn(snapshotRef);
+
+    SnapshotRef updatedRef = mock(SnapshotRef.class);
+    when(updatedRef.snapshotId()).thenReturn(snapshotId + 1);
+    when(updatedRef.isBranch()).thenReturn(true);
+    when(updated.ref(branch)).thenReturn(updatedRef);
+
+    List<UpdateRequirement> requirements =
+        UpdateRequirements.forUpdateTable(
+            metadata,
+            ImmutableList.of(new 
MetadataUpdate.RemovePartitionSpecs(Sets.newHashSet(1, 2))));
+
+    assertThatThrownBy(() -> requirements.forEach(req -> 
req.validate(updated)))
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage(
+            "Requirement failed: branch %s has changed: expected id %s != %s",
+            branch, snapshotId, snapshotId + 1);
+  }
+
   @Test
   public void addSortOrder() {
     List<UpdateRequirement> requirements =
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java 
b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index b1eef4c015..c5e91a6a9e 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -1293,6 +1293,62 @@ public abstract class CatalogTests<C extends Catalog & 
SupportsNamespaces> {
     assertThat(table.spec()).as("Loaded table should have expected 
spec").isEqualTo(TABLE_SPEC);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRemoveUnusedSpec(boolean withBranch) {
+    String branch = "test";
+    C catalog = catalog();
+
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(NS);
+    }
+
+    Table table =
+        catalog
+            .buildTable(TABLE, SCHEMA)
+            .withPartitionSpec(SPEC)
+            .withProperty(TableProperties.GC_ENABLED, "true")
+            .create();
+    PartitionSpec spec = table.spec();
+    // added a file to trigger snapshot expiration
+    table.newFastAppend().appendFile(FILE_A).commit();
+    if (withBranch) {
+      table.manageSnapshots().createBranch(branch).commit();
+    }
+    table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+    table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
+    table.updateSpec().addField("data").commit();
+    assertThat(table.specs()).as("Should have 3 total specs").hasSize(3);
+    PartitionSpec current = table.spec();
+    table.expireSnapshots().cleanExpiredMetadata(true).commit();
+
+    Table loaded = catalog.loadTable(TABLE);
+    assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, 
current);
+
+    // add a data file with current spec and remove the old data file
+    table.newDelete().deleteFile(FILE_A).commit();
+    DataFile anotherFile =
+        DataFiles.builder(current)
+            .withPath("/path/to/data-b.parquet")
+            .withFileSizeInBytes(10)
+            .withPartitionPath("id_bucket=0/data=123") // easy way to set 
partition data for now
+            .withRecordCount(2) // needs at least one record or else metrics 
will filter it out
+            .build();
+    table.newAppend().appendFile(anotherFile).commit();
+    table
+        .expireSnapshots()
+        .cleanExpiredFiles(false)
+        .expireOlderThan(table.currentSnapshot().timestampMillis())
+        .cleanExpiredMetadata(true)
+        .commit();
+    loaded = catalog.loadTable(TABLE);
+    if (withBranch) {
+      assertThat(loaded.specs().values()).containsExactlyInAnyOrder(spec, 
current);
+    } else {
+      assertThat(loaded.specs().values()).containsExactlyInAnyOrder(current);
+    }
+  }
+
   @Test
   public void testUpdateTableSortOrder() {
     C catalog = catalog();

Reply via email to