This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new deeed7efab [core] Preserving old Iceberg Metadata files (#5228)
deeed7efab is described below

commit deeed7efabd9c08c23c243de99f2b49d2791f585
Author: junmuz <[email protected]>
AuthorDate: Thu Mar 13 09:49:04 2025 +0000

    [core] Preserving old Iceberg Metadata files (#5228)
---
 .../generated/iceberg_configuration.html           | 16 ++++++++++++++--
 .../paimon/iceberg/IcebergCommitCallback.java      | 22 ++++++++++++++++++++--
 .../org/apache/paimon/iceberg/IcebergOptions.java  | 20 +++++++++++++++++---
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 12 ++++++++++--
 4 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html 
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index a71cabb679..caa243dc42 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -30,13 +30,13 @@ under the License.
             <td><h5>metadata.iceberg.compaction.max.file-num</h5></td>
             <td style="word-wrap: break-word;">50</td>
             <td>Integer</td>
-            <td>If number of small Iceberg metadata files exceeds this limit, 
always trigger metadata compaction regardless of their total size.</td>
+            <td>If number of small Iceberg manifest metadata files exceeds 
this limit, always trigger manifest metadata compaction regardless of their 
total size.</td>
         </tr>
         <tr>
             <td><h5>metadata.iceberg.compaction.min.file-num</h5></td>
             <td style="word-wrap: break-word;">10</td>
             <td>Integer</td>
-            <td>Minimum number of Iceberg metadata files to trigger metadata 
compaction.</td>
+            <td>Minimum number of Iceberg manifest metadata files to trigger 
manifest metadata compaction.</td>
         </tr>
         <tr>
             <td><h5>metadata.iceberg.database</h5></td>
@@ -44,6 +44,12 @@ under the License.
             <td>String</td>
             <td>Metastore database name for Iceberg Catalog. Set this as an 
iceberg database alias if using a centralized Catalog.</td>
         </tr>
+        <tr>
+            <td><h5>metadata.iceberg.delete-after-commit.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to delete old metadata files after each table 
commit</td>
+        </tr>
         <tr>
             <td><h5>metadata.iceberg.glue.skip-archive</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -80,6 +86,12 @@ under the License.
             <td>Boolean</td>
             <td>Should use the legacy manifest version to generate Iceberg's 
1.4 manifest files.</td>
         </tr>
+        <tr>
+            <td><h5>metadata.iceberg.previous-versions-max</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>The number of old metadata files to keep after each table 
commit</td>
+        </tr>
         <tr>
             <td><h5>metadata.iceberg.storage</h5></td>
             <td style="word-wrap: break-word;">disabled</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index e1b205d9b0..c20d07b9a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -424,7 +424,7 @@ public class IcebergCommitCallback implements 
CommitCallback {
                         new Path(pathFactory.metadataDirectory(), 
VERSION_HINT_FILENAME),
                         String.valueOf(snapshotId));
 
-        table.fileIO().deleteQuietly(baseMetadataPath);
+        deleteApplicableMetadataFiles(snapshotId);
         for (int i = 0; i + 1 < toExpireExceptLast.size(); i++) {
             expireManifestList(
                     new 
Path(toExpireExceptLast.get(i).manifestList()).getName(),
@@ -752,7 +752,25 @@ public class IcebergCommitCallback implements 
CommitCallback {
                 }
                 table.fileIO().deleteQuietly(listPath);
             }
-            table.fileIO().deleteQuietly(path);
+            deleteApplicableMetadataFiles(snapshotId);
+        }
+    }
+
+    private void deleteApplicableMetadataFiles(long snapshotId) throws 
IOException {
+        Options options = new Options(table.options());
+        if (options.get(IcebergOptions.METADATA_DELETE_AFTER_COMMIT)) {
+            long earliestMetadataId =
+                    snapshotId - 
options.get(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX);
+            if (earliestMetadataId > 0) {
+                Iterator<Path> it =
+                        pathFactory
+                                .getAllMetadataPathBefore(table.fileIO(), 
earliestMetadataId)
+                                .iterator();
+                while (it.hasNext()) {
+                    Path path = it.next();
+                    table.fileIO().deleteQuietly(path);
+                }
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index b538a7a606..6835440d84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -42,15 +42,29 @@ public class IcebergOptions {
                     .intType()
                     .defaultValue(10)
                     .withDescription(
-                            "Minimum number of Iceberg metadata files to 
trigger metadata compaction.");
+                            "Minimum number of Iceberg manifest metadata files 
to trigger manifest metadata compaction.");
 
     public static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
             ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
                     .intType()
                     .defaultValue(50)
                     .withDescription(
-                            "If number of small Iceberg metadata files exceeds 
this limit, "
-                                    + "always trigger metadata compaction 
regardless of their total size.");
+                            "If number of small Iceberg manifest metadata 
files exceeds this limit, "
+                                    + "always trigger manifest metadata 
compaction regardless of their total size.");
+
+    public static final ConfigOption<Boolean> METADATA_DELETE_AFTER_COMMIT =
+            key("metadata.iceberg.delete-after-commit.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to delete old metadata files after each 
table commit");
+
+    public static final ConfigOption<Integer> METADATA_PREVIOUS_VERSIONS_MAX =
+            key("metadata.iceberg.previous-versions-max")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "The number of old metadata files to keep after 
each table commit");
 
     public static final ConfigOption<String> URI =
             key("metadata.iceberg.uri")
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 8cd37bdd1e..679529b9b4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -440,8 +440,9 @@ public class IcebergCompatibilityTest {
         write.close();
         commit.close();
 
-        // The old metadata.json is removed when the new metadata.json is 
created.
-        for (int i = 1; i <= 4; i++) {
+        // The old metadata.json is removed when the new metadata.json is 
created
+        // depending on the old metadata retention configuration.
+        for (int i = 1; i <= 3; i++) {
             unusedFiles.add(pathFactory.toMetadataPath(i).toString());
         }
 
@@ -449,6 +450,11 @@ public class IcebergCompatibilityTest {
             assertThat(fileIO.exists(new Path(path))).isFalse();
         }
 
+        // Check existence of retained Iceberg metadata.json files
+        for (int i = 4; i <= 5; i++) {
+            assertThat(fileIO.exists(new 
Path(pathFactory.toMetadataPath(i).toString()))).isTrue();
+        }
+
         // Test all existing Iceberg snapshots are valid.
         assertThat(getIcebergResult())
                 .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 21)", 
"Record(3, 31)");
@@ -961,6 +967,8 @@ public class IcebergCompatibilityTest {
         options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
         options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
         options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+        options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+        options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
         options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, 
MemorySize.ofKibiBytes(8));
         Schema schema =
                 new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), "");

Reply via email to