This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 01a85d18e6 [iceberg] support iceberg rest catalog for iceberg 
compatibility (#5817)
01a85d18e6 is described below

commit 01a85d18e659da246a6eb2de7bce4db9c952c508
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Jul 14 13:50:15 2025 +0800

    [iceberg] support iceberg rest catalog for iceberg compatibility (#5817)
---
 docs/content/iceberg/rest-catalog.md               | 100 ++++
 .../generated/iceberg_configuration.html           |   4 +-
 .../paimon/iceberg/IcebergCommitCallback.java      |  45 +-
 .../paimon/iceberg/IcebergMetadataCommitter.java   |   6 +
 .../org/apache/paimon/iceberg/IcebergOptions.java  |  51 +-
 .../paimon/iceberg/metadata/IcebergSchema.java     |   5 +
 .../iceberg/IcebergHiveMetadataCommitter.java      |  12 +
 paimon-iceberg/pom.xml                             | 133 ++++-
 .../org/apache/iceberg/IcebergSnapshotRefType.java |  20 +-
 .../IcebergRESTMetadataCommitterFactory.java       |  20 +-
 .../iceberg/IcebergRestMetadataCommitter.java      | 430 ++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  16 +
 .../flink/IcebergRestMetadataCommitterITCase.java  | 128 ++++
 .../iceberg/IcebergRestMetadataCommitterTest.java  | 641 +++++++++++++++++++++
 14 files changed, 1557 insertions(+), 54 deletions(-)

diff --git a/docs/content/iceberg/rest-catalog.md 
b/docs/content/iceberg/rest-catalog.md
new file mode 100644
index 0000000000..177292f9e8
--- /dev/null
+++ b/docs/content/iceberg/rest-catalog.md
@@ -0,0 +1,100 @@
+---
+title: "Rest Catalog"
+weight: 5
+type: docs
+aliases:
+- /iceberg/rest-catalog.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Rest Catalog
+
+When creating Paimon table, set `'metadata.iceberg.storage' = 'rest-catalog'`.
+This option value will not only store Iceberg metadata like hadoop-catalog, 
but also create table in [iceberg rest 
catalog](https://iceberg.apache.org/terms/#decoupling-using-the-rest-catalog).
+This Paimon table can be accessed from Iceberg Rest catalog later.
+
+You need to provide information about Rest Catalog by setting options prefixed 
with `'metadata.iceberg.rest.'`, such as 
+`'metadata.iceberg.rest.uri' = 'https://localhost/'`. Paimon will try to use 
these options to initialize a iceberg rest catalog, 
+and use this rest catalog to commit metadata.
+
+**Dependency:**
+
+This feature needs dependency: [paimon-iceberg-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-iceberg/{{<
 version >}}/), 
+and JDK version should be 11+.
+
+You can also manually build the jar from the source code.(need JDK 11+)
+
+To build from source code, [clone the git repository]({{< github_repo >}}).
+
+Build bundled jar with the following command.
+- `mvn clean install -DskipTests`
+
+You can find the jar in `./paimon-iceberg/target/paimon-iceberg-{{< version 
>}}.jar`.
+
+**Example:**
+
+Here is an example using flink sql:
+```sql
+-- create a paimon table
+CREATE TABLE `paimon`.`default`.`T` (
+  pt INT,
+  k INT,
+  v INT,
+  PRIMARY KEY (pt, k) NOT ENFORCED
+) PARTITIONED BY (pt) WITH (
+  'metadata.iceberg.storage' = 'rest-catalog',
+  'metadata.iceberg.rest.uri' = 'http://localhost:55807/',
+  'metadata.iceberg.rest.warehouse' = 'rck_warehouse',
+  'metadata.iceberg.rest.clients' = '1'
+);
+
+-- insert some data
+INSERT INTO `paimon`.`default`.`T` VALUES(1, 9, 90),(1, 10, 100),(1, 11, 
110),(2, 20, 200);
+
+-- create a iceberg rest catalog
+CREATE CATALOG `iceberg` WITH (
+  'type' = 'iceberg',
+  'catalog-type' = 'rest',
+  'uri' = 'http://localhost:55807/',
+  'clients' = '1',
+  'cache-enabled' = 'false'
+)
+
+-- verify the data in iceberg rest-catalog
+SELECT v, k, pt FROM `iceberg`.`default`.`T` ORDER BY pt, k;
+/*
+the query results:
+ 90,  9, 1
+100, 10, 1
+110, 11, 1
+200, 20, 2
+*/
+```
+**Note:**
+
+Paimon will firstly write iceberg metadata in a separate directory like 
hadoop-catalog, and then commit metadata to iceberg rest catalog.
+If the two are incompatible, we take the metadata stored in the separate 
directory as the reference.
+
+There are some cases when committing to iceberg rest catalog:
+1. table not exists in iceberg rest-catalog. It'll create the table in rest 
catalog first, and commit metadata.
+2. table exists in iceberg rest-catalog and is compatible with the base 
metadata stored in the separate directory. It'll directly get the table and 
commit metadata.
+3. table exists, and last-sequence-number is 0 and current-snapshot-id is -1. 
It'll treat the table as a new table, directly get the table and commit 
metadata.
+4. table exists, and isn't compatible with the base metadata stored in the 
separate directory. It'll **drop the table and recreate the table**, then 
commit metadata. 
+
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html 
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index 30c05ef9b5..cf57763779 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -102,13 +102,13 @@ under the License.
             <td><h5>metadata.iceberg.previous-versions-max</h5></td>
             <td style="word-wrap: break-word;">0</td>
             <td>Integer</td>
-            <td>The number of old metadata files to keep after each table 
commit</td>
+            <td>The number of old metadata files to keep after each table 
commit. For rest-catalog, it will keep 1 old metadata at least.</td>
         </tr>
         <tr>
             <td><h5>metadata.iceberg.storage</h5></td>
             <td style="word-wrap: break-word;">disabled</td>
             <td><p>Enum</p></td>
-            <td>When set, produce Iceberg metadata after a snapshot is 
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br 
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility 
support.</li><li>"table-location": Store Iceberg metadata in each table's 
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate 
directory. This directory can be specified as the warehouse directory of an 
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
+            <td>When set, produce Iceberg metadata after a snapshot is 
committed, so that Iceberg readers can read Paimon's raw data files.<br /><br 
/>Possible values:<ul><li>"disabled": Disable Iceberg compatibility 
support.</li><li>"table-location": Store Iceberg metadata in each table's 
directory.</li><li>"hadoop-catalog": Store Iceberg metadata in a separate 
directory. This directory can be specified as the warehouse directory of an 
Iceberg Hadoop catalog.</li><li>"hive-catalog": Not [...]
         </tr>
         <tr>
             <td><h5>metadata.iceberg.storage-location</h5></td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index a53f132ead..ccff0fd162 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -90,6 +90,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -208,6 +209,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 return IcebergOptions.StorageLocation.TABLE_LOCATION;
             case HIVE_CATALOG:
             case HADOOP_CATALOG:
+            case REST_CATALOG:
                 return IcebergOptions.StorageLocation.CATALOG_STORAGE;
             default:
                 throw new UnsupportedOperationException(
@@ -349,6 +351,11 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                                         entry -> new 
IcebergRef(entry.getKey().id())));
 
         String tableUuid = UUID.randomUUID().toString();
+
+        List<IcebergSchema> allSchemas =
+                IntStream.rangeClosed(0, schemaId)
+                        .mapToObj(schemaCache::get)
+                        .collect(Collectors.toList());
         IcebergMetadata metadata =
                 new IcebergMetadata(
                         formatVersion,
@@ -356,7 +363,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                         table.location().toString(),
                         snapshotId,
                         icebergSchema.highestFieldId(),
-                        Collections.singletonList(icebergSchema),
+                        allSchemas,
                         schemaId,
                         Collections.singletonList(new 
IcebergPartitionSpec(partitionFields)),
                         partitionFields.stream()
@@ -379,7 +386,17 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         expireAllBefore(snapshotId);
 
         if (metadataCommitter != null) {
-            metadataCommitter.commitMetadata(metadataPath, null);
+            switch (metadataCommitter.identifier()) {
+                case "hive":
+                    metadataCommitter.commitMetadata(metadataPath, null);
+                    break;
+                case "rest":
+                    metadataCommitter.commitMetadata(metadata, null);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported metadata committer: " + 
metadataCommitter.identifier());
+            }
         }
     }
 
@@ -557,14 +574,22 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                                         newDVManifestFileMetas.stream())
                                 .collect(Collectors.toList()));
 
-        // add new schema if needed
+        // add new schemas if needed
         SchemaCache schemaCache = new SchemaCache();
         int schemaId = (int) schemaCache.getLatestSchemaId();
         IcebergSchema icebergSchema = schemaCache.get(schemaId);
         List<IcebergSchema> schemas = baseMetadata.schemas();
         if (baseMetadata.currentSchemaId() != schemaId) {
+            Preconditions.checkArgument(
+                    schemaId > baseMetadata.currentSchemaId(),
+                    "currentSchemaId{%s} in paimon should be greater than 
currentSchemaId{%s} in base metadata.",
+                    schemaId,
+                    baseMetadata.currentSchemaId());
             schemas = new ArrayList<>(schemas);
-            schemas.add(icebergSchema);
+            schemas.addAll(
+                    IntStream.rangeClosed(baseMetadata.currentSchemaId() + 1, 
schemaId)
+                            .mapToObj(schemaCache::get)
+                            .collect(Collectors.toList()));
         }
 
         List<IcebergSnapshot> snapshots = new 
ArrayList<>(baseMetadata.snapshots());
@@ -619,7 +644,17 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         }
 
         if (metadataCommitter != null) {
-            metadataCommitter.commitMetadata(metadataPath, baseMetadataPath);
+            switch (metadataCommitter.identifier()) {
+                case "hive":
+                    metadataCommitter.commitMetadata(metadataPath, 
baseMetadataPath);
+                    break;
+                case "rest":
+                    metadataCommitter.commitMetadata(metadata, baseMetadata);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported metadata committer: " + 
metadataCommitter.identifier());
+            }
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
index 2fe1314983..64ecb96062 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.iceberg;
 
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
 
 import javax.annotation.Nullable;
 
@@ -28,5 +29,10 @@ import javax.annotation.Nullable;
  */
 public interface IcebergMetadataCommitter {
 
+    String identifier();
+
     void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+
+    void commitMetadata(
+            IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata 
baseIcebergMetadata);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index 2fe9abba26..f371db825c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -20,15 +20,22 @@ package org.apache.paimon.iceberg;
 
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.InlineElement;
 import org.apache.paimon.options.description.TextElement;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.paimon.options.ConfigOptions.key;
 
 /** Config options for Paimon Iceberg compatibility. */
 public class IcebergOptions {
 
+    public static final String REST_CONFIG_PREFIX = "metadata.iceberg.rest.";
+
     public static final ConfigOption<StorageType> METADATA_ICEBERG_STORAGE =
             key("metadata.iceberg.storage")
                     .enumType(StorageType.class)
@@ -79,7 +86,8 @@ public class IcebergOptions {
                     .intType()
                     .defaultValue(0)
                     .withDescription(
-                            "The number of old metadata files to keep after 
each table commit");
+                            "The number of old metadata files to keep after 
each table commit. "
+                                    + "For rest-catalog, it will keep 1 old 
metadata at least.");
 
     public static final ConfigOption<String> URI =
             key("metadata.iceberg.uri")
@@ -147,6 +155,41 @@ public class IcebergOptions {
                     .defaultValue(false)
                     .withDescription("Skip updating Hive stats.");
 
+    private final Options options;
+
+    public IcebergOptions(Map<String, String> options) {
+        this(Options.fromMap(options));
+    }
+
+    public IcebergOptions(Options options) {
+        this.options = options;
+    }
+
+    public Map<String, String> icebergRestConfig() {
+        Map<String, String> restConfig = new HashMap<>();
+        options.keySet()
+                .forEach(
+                        key -> {
+                            if (key.startsWith(REST_CONFIG_PREFIX)) {
+                                String restConfigKey = 
key.substring(REST_CONFIG_PREFIX.length());
+                                Preconditions.checkArgument(
+                                        !restConfigKey.isEmpty(),
+                                        "config key '%s' for iceberg rest 
catalog is empty!",
+                                        key);
+                                restConfig.put(restConfigKey, 
options.get(key));
+                            }
+                        });
+        return restConfig;
+    }
+
+    public boolean deleteAfterCommitEnabled() {
+        return options.get(METADATA_DELETE_AFTER_COMMIT);
+    }
+
+    public int previousVersionsMax() {
+        return options.get(METADATA_PREVIOUS_VERSIONS_MAX);
+    }
+
     /** Where to store Iceberg metadata. */
     public enum StorageType implements DescribedEnum {
         DISABLED("disabled", "Disable Iceberg compatibility support."),
@@ -158,7 +201,11 @@ public class IcebergOptions {
         HIVE_CATALOG(
                 "hive-catalog",
                 "Not only store Iceberg metadata like hadoop-catalog, "
-                        + "but also create Iceberg external table in Hive.");
+                        + "but also create Iceberg external table in Hive."),
+        REST_CATALOG(
+                "rest-catalog",
+                "Store Iceberg metadata in a REST catalog. "
+                        + "This allows integration with Iceberg REST catalog 
services.");
 
         private final String value;
         private final String description;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
index ff28a3bfd2..6c18eed7ab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.iceberg.metadata;
 
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -93,6 +94,10 @@ public class IcebergSchema {
         return fields.stream().mapToInt(IcebergDataField::id).max().orElse(0);
     }
 
+    public String toJson() {
+        return JsonSerdeUtil.toJson(this);
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(type, schemaId, fields);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index 6bfe86ad29..944cc94398 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.hive.HiveTypeUtils;
 import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
@@ -117,6 +118,11 @@ public class IcebergHiveMetadataCommitter implements 
IcebergMetadataCommitter {
                         hiveConf, options, 
options.getString(IcebergOptions.HIVE_CLIENT_CLASS));
     }
 
+    @Override
+    public String identifier() {
+        return "hive";
+    }
+
     @Override
     public void commitMetadata(Path newMetadataPath, @Nullable Path 
baseMetadataPath) {
         try {
@@ -126,6 +132,12 @@ public class IcebergHiveMetadataCommitter implements 
IcebergMetadataCommitter {
         }
     }
 
+    @Override
+    public void commitMetadata(
+            IcebergMetadata icebergMetadata, @Nullable IcebergMetadata 
baseIcebergMetadata) {
+        throw new UnsupportedOperationException();
+    }
+
     private void commitMetadataImpl(Path newMetadataPath, @Nullable Path 
baseMetadataPath)
             throws Exception {
         if (!databaseExists(icebergHiveDatabase)) {
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
index f9df0801d0..c3a7c3aa45 100644
--- a/paimon-iceberg/pom.xml
+++ b/paimon-iceberg/pom.xml
@@ -45,7 +45,13 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -76,6 +82,51 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- iceberg dependency -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-open-api</artifactId>
+            <version>${iceberg.version}</version>
+            <classifier>test-fixtures</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- hadoop dependency -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -88,7 +139,7 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hadoop.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.avro</groupId>
@@ -118,37 +169,46 @@ under the License.
         </dependency>
 
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-            <version>1.21</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- iceberg dependency -->
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-core</artifactId>
-            <version>${iceberg.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-data</artifactId>
-            <version>${iceberg.version}</version>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
             <scope>test</scope>
             <exclusions>
                 <exclusion>
-                    <artifactId>parquet-hadoop</artifactId>
-                    <groupId>org.apache.parquet</groupId>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.iceberg</groupId>
-            <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
-            <version>${iceberg.version}</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.21</version>
             <scope>test</scope>
         </dependency>
 
@@ -182,6 +242,29 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- web test dependency -->
+
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>11.0.24</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>11.0.24</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.servlet</groupId>
+            <artifactId>jakarta.servlet-api</artifactId>
+            <version>6.1.0</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
 b/paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
similarity index 70%
copy from 
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
copy to 
paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
index 2fe1314983..02343a1173 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++ 
b/paimon-iceberg/src/main/java/org/apache/iceberg/IcebergSnapshotRefType.java
@@ -16,17 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.iceberg;
+package org.apache.iceberg;
 
-import org.apache.paimon.fs.Path;
+/** doc. */
+public class IcebergSnapshotRefType {
+    public static SnapshotRefType branchType() {
+        return SnapshotRefType.BRANCH;
+    }
 
-import javax.annotation.Nullable;
-
-/**
- * Commit Iceberg metadata to metastore. Each kind of Iceberg catalog should 
have its own
- * implementation.
- */
-public interface IcebergMetadataCommitter {
-
-    void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+    public static SnapshotRefType tagType() {
+        return SnapshotRefType.TAG;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
similarity index 67%
copy from 
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
copy to 
paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
index 2fe1314983..e73a883f47 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMetadataCommitter.java
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRESTMetadataCommitterFactory.java
@@ -18,15 +18,17 @@
 
 package org.apache.paimon.iceberg;
 
-import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
 
-import javax.annotation.Nullable;
+/** doc. */
+public class IcebergRESTMetadataCommitterFactory implements 
IcebergMetadataCommitterFactory {
+    @Override
+    public String identifier() {
+        return IcebergOptions.StorageType.REST_CATALOG.toString();
+    }
 
-/**
- * Commit Iceberg metadata to metastore. Each kind of Iceberg catalog should 
have its own
- * implementation.
- */
-public interface IcebergMetadataCommitter {
-
-    void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath);
+    @Override
+    public IcebergMetadataCommitter create(FileStoreTable table) {
+        return new IcebergRestMetadataCommitter(table);
+    }
 }
diff --git 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
new file mode 100644
index 0000000000..1fa0db2441
--- /dev/null
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static 
org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
+import static 
org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
+
+/**
+ * commit Iceberg metadata to Iceberg's rest catalog, so the table can be 
visited by Iceberg's rest
+ * catalog.
+ */
+public class IcebergRestMetadataCommitter implements IcebergMetadataCommitter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRestMetadataCommitter.class);
+
+    private static final String REST_CATALOG_NAME = "rest-catalog";
+
+    private final RESTCatalog restCatalog;
+    private final String icebergDatabaseName;
+    private final TableIdentifier icebergTableIdentifier;
+    private final IcebergOptions icebergOptions;
+
+    private Table icebergTable;
+
+    public IcebergRestMetadataCommitter(FileStoreTable table) {
+        Options options = new Options(table.options());
+        icebergOptions = new IcebergOptions(options);
+
+        Identifier identifier = 
Preconditions.checkNotNull(table.catalogEnvironment().identifier());
+        String icebergDatabase = 
options.get(IcebergOptions.METASTORE_DATABASE);
+        String icebergTable = options.get(IcebergOptions.METASTORE_TABLE);
+        this.icebergDatabaseName =
+                icebergDatabase != null && !icebergDatabase.isEmpty()
+                        ? icebergDatabase
+                        : identifier.getDatabaseName();
+        String icebergTableName =
+                icebergTable != null && !icebergTable.isEmpty()
+                        ? icebergTable
+                        : identifier.getTableName();
+        this.icebergTableIdentifier =
+                TableIdentifier.of(Namespace.of(icebergDatabaseName), 
icebergTableName);
+
+        Map<String, String> restConfigs = icebergOptions.icebergRestConfig();
+
+        try {
+            Configuration hadoopConf = new Configuration();
+            
hadoopConf.setClassLoader(IcebergRestMetadataCommitter.class.getClassLoader());
+
+            this.restCatalog = initRestCatalog(restConfigs, hadoopConf);
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to initialize iceberg rest 
catalog.", e);
+        }
+    }
+
+    @Override
+    public String identifier() {
+        return "rest";
+    }
+
+    @Override
+    public void commitMetadata(Path newMetadataPath, @Nullable Path 
baseMetadataPath) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void commitMetadata(
+            IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata 
baseIcebergMetadata) {
+        try {
+            commitMetadataImpl(newIcebergMetadata, baseIcebergMetadata);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void commitMetadataImpl(
+            IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata 
baseIcebergMetadata) {
+
+        newIcebergMetadata = adjustMetadataForRest(newIcebergMetadata);
+        TableMetadata newMetadata = 
TableMetadataParser.fromJson(newIcebergMetadata.toJson());
+
+        // updates to be committed
+        TableMetadata.Builder updatdeBuilder;
+
+        // create database if not exist
+        if (!databaseExists()) {
+            createDatabase();
+        }
+
+        try {
+            if (!tableExists()) {
+                LOG.info("Table {} does not exist, create it.", 
icebergTableIdentifier);
+                icebergTable = createTable();
+                updatdeBuilder =
+                        updatesForCorrectBase(
+                                ((BaseTable) 
icebergTable).operations().current(),
+                                newMetadata,
+                                true);
+            } else {
+                icebergTable = getTable();
+
+                TableMetadata metadata = ((BaseTable) 
icebergTable).operations().current();
+                boolean withBase = checkBase(metadata, newMetadata, 
baseIcebergMetadata);
+                if (withBase) {
+                    LOG.info("create updates with base metadata.");
+                    updatdeBuilder = updatesForCorrectBase(metadata, 
newMetadata, false);
+                } else {
+                    LOG.info(
+                            "create updates without base metadata. 
currentSnapshotId for base metadata: {}, for new metadata:{}",
+                            metadata.currentSnapshot().snapshotId(),
+                            newMetadata.currentSnapshot().snapshotId());
+                    updatdeBuilder = updatesForIncorrectBase(newMetadata);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Fail to create table or get table: " + 
icebergTableIdentifier, e);
+        }
+
+        TableMetadata updatedForCommit = updatdeBuilder.build();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("updates:{}", 
updatesToString(updatedForCommit.changes()));
+        }
+
+        try {
+            ((BaseTable) icebergTable)
+                    .operations()
+                    .commit(((BaseTable) icebergTable).operations().current(), 
updatedForCommit);
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to commit metadata to rest 
catalog.", e);
+        }
+    }
+
+    private TableMetadata.Builder updatesForCorrectBase(
+            TableMetadata base, TableMetadata newMetadata, boolean isNewTable) 
{
+        TableMetadata.Builder updateBuilder = TableMetadata.buildFrom(base);
+
+        int schemaId = icebergTable.schema().schemaId();
+        if (isNewTable) {
+            Preconditions.checkArgument(
+                    schemaId == 0,
+                    "the schema id for newly created iceberg table should be 
0, but is %s",
+                    schemaId);
+            // add all schemas
+            addAndSetCurrentSchema(
+                    newMetadata.schemas(), newMetadata.currentSchemaId(), 
updateBuilder);
+            updateBuilder.addPartitionSpec(newMetadata.spec());
+            updateBuilder.setDefaultPartitionSpec(newMetadata.defaultSpecId());
+
+            // add snapshot
+            addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
+
+        } else {
+            // add new schema if needed
+            Preconditions.checkArgument(
+                    newMetadata.currentSchemaId() >= schemaId,
+                    "the new metadata has correct base, but the schemaId(%s) 
in iceberg table "
+                            + "is greater than currentSchemaId(%s) in new 
metadata.",
+                    schemaId,
+                    newMetadata.currentSchemaId());
+            if (newMetadata.currentSchemaId() != schemaId) {
+                addAndSetCurrentSchema(
+                        newMetadata.schemas().stream()
+                                .filter(schema -> schema.schemaId() > schemaId)
+                                .collect(Collectors.toList()),
+                        newMetadata.currentSchemaId(),
+                        updateBuilder);
+            }
+
+            // add snapshot
+            addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
+
+            // remove snapshots not in new metadata
+            Set<Long> snapshotIdsToRemove = new HashSet<>();
+            icebergTable
+                    .snapshots()
+                    .forEach(snapshot -> 
snapshotIdsToRemove.add(snapshot.snapshotId()));
+            Set<Long> snapshotIdsInNewMetadata =
+                    newMetadata.snapshots().stream()
+                            .map(Snapshot::snapshotId)
+                            .collect(Collectors.toSet());
+            snapshotIdsToRemove.removeAll(snapshotIdsInNewMetadata);
+            removeSnapshots(snapshotIdsToRemove, updateBuilder);
+        }
+
+        return updateBuilder;
+    }
+
+    private TableMetadata.Builder updatesForIncorrectBase(TableMetadata 
newMetadata) {
+        LOG.info("the base metadata is incorrect, we'll recreate the iceberg 
table.");
+        icebergTable = recreateTable();
+        return updatesForCorrectBase(
+                ((BaseTable) icebergTable).operations().current(), 
newMetadata, true);
+    }
+
+    private RESTCatalog initRestCatalog(Map<String, String> restConfigs, 
Configuration conf) {
+        restConfigs.put(ICEBERG_CATALOG_TYPE, "rest");
+        Catalog catalog = CatalogUtil.buildIcebergCatalog(REST_CATALOG_NAME, 
restConfigs, conf);
+        return (RESTCatalog) catalog;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // rest catalog invoke
+    // 
-------------------------------------------------------------------------------------
+
+    private boolean databaseExists() {
+        return restCatalog.namespaceExists(Namespace.of(icebergDatabaseName));
+    }
+
+    private boolean tableExists() {
+        return restCatalog.tableExists(icebergTableIdentifier);
+    }
+
+    private void createDatabase() {
+        restCatalog.createNamespace(Namespace.of(icebergDatabaseName));
+    }
+
+    private Table createTable() {
+        /* Here we create iceberg table with an emptySchema. This is because:
+        When creating table, fieldId in iceberg will be forced to start from 
1, while fieldId in paimon usually start from 0.
+        If we directly use the schema extracted from paimon to create iceberg 
table, the fieldId will be in disorder, and this
+        may cause incorrectness when reading by iceberg reader. So we use an 
emptySchema here, and add the corresponding
+        schemas later.
+        */
+        Schema emptySchema = new Schema();
+        return restCatalog.createTable(icebergTableIdentifier, emptySchema);
+    }
+
+    private Table getTable() {
+        return restCatalog.loadTable(icebergTableIdentifier);
+    }
+
+    private void dropTable() {
+        // set purge to false, because we don't need to delete the data files
+        restCatalog.dropTable(icebergTableIdentifier, false);
+    }
+
+    private Table recreateTable() {
+        try {
+            dropTable();
+            return createTable();
+        } catch (Exception e) {
+            throw new RuntimeException("Fail to recreate iceberg table.", e);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // metadata updates
+    // 
-------------------------------------------------------------------------------------
+
+    // add a new snapshot and point it as current snapshot
+    private void addNewSnapshot(Snapshot newSnapshot, TableMetadata.Builder 
update) {
+        update.setBranchSnapshot(newSnapshot, SnapshotRef.MAIN_BRANCH);
+    }
+
+    // remove snapshots recorded in table metadata
+    private void removeSnapshots(Set<Long> snapshotIds, TableMetadata.Builder 
update) {
+        update.removeSnapshots(snapshotIds);
+    }
+
+    // add schemas and set the current schema id
+    private void addAndSetCurrentSchema(
+            List<Schema> schemas, int currentSchemaId, TableMetadata.Builder 
update) {
+        for (Schema schema : schemas) {
+            update.addSchema(schema);
+        }
+        update.setCurrentSchema(currentSchemaId);
+
+        // update properties
+        Map<String, String> properties = new HashMap<>();
+        properties.put(
+                METADATA_PREVIOUS_VERSIONS_MAX,
+                String.valueOf(icebergOptions.previousVersionsMax()));
+        properties.put(
+                METADATA_DELETE_AFTER_COMMIT_ENABLED,
+                String.valueOf(icebergOptions.deleteAfterCommitEnabled()));
+        update.setProperties(properties);
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Utils
+    // 
-------------------------------------------------------------------------------------
+
+    /**
+     * @param currentMetadata the current metadata used by iceberg table
+     * @param newMetadata the new metadata to be committed
+     * @param baseIcebergMetadata the base metadata previously written by 
paimon
+     * @return whether the iceberg table has base metadata
+     */
+    private static boolean checkBase(
+            TableMetadata currentMetadata,
+            TableMetadata newMetadata,
+            @Nullable IcebergMetadata baseIcebergMetadata) {
+        // take the base metadata from IcebergCommitCallback as the first 
reference
+        if (baseIcebergMetadata == null) {
+            LOG.info(
+                    "new metadata without base metadata cause base metadata 
from upstream is null.");
+            return false;
+        }
+
+        // if the iceberg table is existed, check whether the current metadata 
of the table is the
+        // base of the new table metadata, we use current snapshot id to check
+        return currentMetadata.currentSnapshot().snapshotId()
+                == newMetadata.currentSnapshot().snapshotId() - 1;
+    }
+
+    private IcebergMetadata adjustMetadataForRest(IcebergMetadata 
newIcebergMetadata) {
+        // why need this:
+        // Since we will use an empty schema to create iceberg table in rest 
catalog and id-0 will
+        // be occupied by the empty schema, there will be 1-unit offset 
between the schema-id in
+        // metadata stored in rest catalog and the schema-id in paimon.
+
+        List<IcebergSchema> schemas =
+                newIcebergMetadata.schemas().stream()
+                        .map(schema -> new IcebergSchema(schema.schemaId() + 
1, schema.fields()))
+                        .collect(Collectors.toList());
+        int currentSchemaId = newIcebergMetadata.currentSchemaId() + 1;
+        List<IcebergSnapshot> snapshots =
+                newIcebergMetadata.snapshots().stream()
+                        .map(
+                                snapshot ->
+                                        new IcebergSnapshot(
+                                                snapshot.sequenceNumber(),
+                                                snapshot.snapshotId(),
+                                                snapshot.timestampMs(),
+                                                snapshot.summary(),
+                                                snapshot.manifestList(),
+                                                snapshot.schemaId() + 1))
+                        .collect(Collectors.toList());
+        return new IcebergMetadata(
+                newIcebergMetadata.formatVersion(),
+                newIcebergMetadata.tableUuid(),
+                newIcebergMetadata.location(),
+                newIcebergMetadata.currentSnapshotId(),
+                newIcebergMetadata.lastColumnId(),
+                schemas,
+                currentSchemaId,
+                newIcebergMetadata.partitionSpecs(),
+                newIcebergMetadata.lastPartitionId(),
+                snapshots,
+                newIcebergMetadata.currentSnapshotId(),
+                newIcebergMetadata.refs());
+    }
+
+    private static String updateToString(MetadataUpdate update) {
+        if (update instanceof MetadataUpdate.AddSnapshot) {
+            return String.format(
+                    "AddSnapshot(%s)",
+                    ((MetadataUpdate.AddSnapshot) 
update).snapshot().snapshotId());
+        } else if (update instanceof MetadataUpdate.RemoveSnapshot) {
+            return String.format(
+                    "RemoveSnapshot(%s)", ((MetadataUpdate.RemoveSnapshot) 
update).snapshotId());
+        } else if (update instanceof MetadataUpdate.SetSnapshotRef) {
+            return String.format(
+                    "SetSnapshotRef(%s, %s, %s)",
+                    ((MetadataUpdate.SetSnapshotRef) update).name(),
+                    ((MetadataUpdate.SetSnapshotRef) update).type(),
+                    ((MetadataUpdate.SetSnapshotRef) update).snapshotId());
+        } else if (update instanceof MetadataUpdate.AddSchema) {
+            return String.format(
+                    "AddSchema(%s)", ((MetadataUpdate.AddSchema) 
update).schema().schemaId());
+        } else if (update instanceof MetadataUpdate.SetCurrentSchema) {
+            return String.format(
+                    "SetCurrentSchema(%s)", ((MetadataUpdate.SetCurrentSchema) 
update).schemaId());
+        } else if (update instanceof MetadataUpdate.SetProperties) {
+            return String.format(
+                    "SetProperties(%s)", ((MetadataUpdate.SetProperties) 
update).updated());
+        } else {
+            return update.toString();
+        }
+    }
+
+    private static String updatesToString(List<MetadataUpdate> updates) {
+        return updates.stream()
+                .map(IcebergRestMetadataCommitter::updateToString)
+                .collect(Collectors.joining(", "));
+    }
+}
diff --git 
a/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
new file mode 100644
index 0000000000..5820876fb0
--- /dev/null
+++ 
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.iceberg.IcebergRESTMetadataCommitterFactory
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
new file mode 100644
index 0000000000..72de210ba7
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/IcebergRestMetadataCommitterITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.iceberg.IcebergRestMetadataCommitter;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link IcebergRestMetadataCommitter}. */
+public class IcebergRestMetadataCommitterITCase extends AbstractTestBase {
+
+    @RegisterExtension
+    private static final RESTServerExtension REST_SERVER_EXTENSION =
+            new RESTServerExtension(
+                    Map.of(
+                            RESTCatalogServer.REST_PORT,
+                            RESTServerExtension.FREE_PORT,
+                            CatalogProperties.CLIENT_POOL_SIZE,
+                            "1",
+                            CatalogProperties.CATALOG_IMPL,
+                            HadoopCatalog.class.getName()));
+
+    @ParameterizedTest
+    @ValueSource(strings = {"avro", "parquet", "orc"})
+    public void testCommitToRestCatalog(String fileFormat) throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+
+        RESTCatalog restCatalog = REST_SERVER_EXTENSION.client();
+        String restUri = restCatalog.properties().get(CatalogProperties.URI);
+        String restWarehouse = 
restCatalog.properties().get(CatalogProperties.WAREHOUSE_LOCATION);
+        String restClients = 
restCatalog.properties().get(CatalogProperties.CLIENT_POOL_SIZE);
+
+        tEnv.executeSql(
+                "CREATE CATALOG paimon WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE paimon.`default`.T (\n"
+                                + "  pt INT,\n"
+                                + "  k INT,\n"
+                                + "  v INT,\n"
+                                + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                                + ") PARTITIONED BY (pt) WITH (\n"
+                                + "  'metadata.iceberg.storage' = 
'rest-catalog',\n"
+                                + "  'metadata.iceberg.rest.uri' = '%s',\n"
+                                + "  'metadata.iceberg.rest.warehouse' = 
'%s',\n"
+                                + "  'metadata.iceberg.rest.clients' = '%s',\n"
+                                + "  'file.format' = '%s'\n"
+                                + ")",
+                        restUri, restWarehouse, restClients, fileFormat));
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 9, 90), "
+                                + "(1, 10, 100), "
+                                + "(1, 11, 110), "
+                                + "(2, 20, 200)")
+                .await();
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG iceberg WITH (\n"
+                                + "  'type' = 'iceberg',\n"
+                                + "  'catalog-type' = 'rest',\n"
+                                + "  'uri' = '%s',\n"
+                                + "  'clients' = '%s',\n"
+                                + "  'cache-enabled' = 'false'\n"
+                                + ")",
+                        restUri, restClients));
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v, k, pt FROM 
iceberg.`default`.T ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(90, 9, 1),
+                        Row.of(100, 10, 1),
+                        Row.of(110, 11, 1),
+                        Row.of(200, 20, 2));
+    }
+
+    private List<Row> collect(TableResult result) throws Exception {
+        List<Row> rows = new ArrayList<>();
+        try (CloseableIterator<Row> it = result.collect()) {
+            while (it.hasNext()) {
+                rows.add(it.next());
+            }
+        }
+        return rows;
+    }
+}
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
new file mode 100644
index 0000000000..a2e3f40bb9
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTCatalogServer;
+import org.apache.iceberg.rest.RESTServerExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.iceberg.IcebergCommitCallback.catalogTableMetadataPath;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IcebergRestMetadataCommitter}. */
+public class IcebergRestMetadataCommitterTest {
+
+    @RegisterExtension
+    private static final RESTServerExtension REST_SERVER_EXTENSION =
+            new RESTServerExtension(
+                    Map.of(
+                            RESTCatalogServer.REST_PORT,
+                            RESTServerExtension.FREE_PORT,
+                            CatalogProperties.CLIENT_POOL_SIZE,
+                            "1",
+                            CatalogProperties.CATALOG_IMPL,
+                            HadoopCatalog.class.getName()));
+
+    @TempDir public java.nio.file.Path tempDir;
+
+    protected static RESTCatalog restCatalog;
+
+    @BeforeEach
+    public void setUp() {
+        restCatalog = REST_SERVER_EXTENSION.client();
+    }
+
+    @Test
+    public void testUnPartitionedPrimaryKeyTable() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.STRING(), 
DataTypes.INT(), DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+
+        int numRounds = 20;
+        int numRecords = 1000;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<List<TestRecord>> testRecords = new ArrayList<>();
+        List<List<String>> expected = new ArrayList<>();
+        Map<String, String> expectedMap = new LinkedHashMap<>();
+        for (int r = 0; r < numRounds; r++) {
+            List<TestRecord> round = new ArrayList<>();
+            for (int i = 0; i < numRecords; i++) {
+                int k1 = random.nextInt(0, 100);
+                String k2 = String.valueOf(random.nextInt(1000, 1010));
+                int v1 = random.nextInt();
+                long v2 = random.nextLong();
+                round.add(
+                        new TestRecord(
+                                BinaryRow.EMPTY_ROW,
+                                GenericRow.of(k1, BinaryString.fromString(k2), 
v1, v2)));
+                expectedMap.put(String.format("%d, %s", k1, k2), 
String.format("%d, %d", v1, v2));
+            }
+            testRecords.add(round);
+            expected.add(
+                    expectedMap.entrySet().stream()
+                            .map(e -> String.format("Record(%s, %s)", 
e.getKey(), e.getValue()))
+                            .collect(Collectors.toList()));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Collections.emptyList(),
+                Arrays.asList("k1", "k2"),
+                testRecords,
+                expected,
+                Record::toString);
+    }
+
+    @Test
+    public void testPartitionedPrimaryKeyTable() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"pt1", "pt2", "k", "v1", "v2"});
+
+        BiFunction<Integer, String, BinaryRow> binaryRow =
+                (pt1, pt2) -> {
+                    BinaryRow b = new BinaryRow(2);
+                    BinaryRowWriter writer = new BinaryRowWriter(b);
+                    writer.writeInt(0, pt1);
+                    writer.writeString(1, BinaryString.fromString(pt2));
+                    writer.complete();
+                    return b;
+                };
+
+        int numRounds = 20;
+        int numRecords = 500;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean samePartitionEachRound = random.nextBoolean();
+
+        List<List<TestRecord>> testRecords = new ArrayList<>();
+        List<List<String>> expected = new ArrayList<>();
+        Map<String, String> expectedMap = new LinkedHashMap<>();
+        for (int r = 0; r < numRounds; r++) {
+            List<TestRecord> round = new ArrayList<>();
+            for (int i = 0; i < numRecords; i++) {
+                int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + 
r) % 3;
+                String pt2 = String.valueOf(random.nextInt(10, 12));
+                String k = String.valueOf(random.nextInt(0, 100));
+                int v1 = random.nextInt();
+                long v2 = random.nextLong();
+                round.add(
+                        new TestRecord(
+                                binaryRow.apply(pt1, pt2),
+                                GenericRow.of(
+                                        pt1,
+                                        BinaryString.fromString(pt2),
+                                        BinaryString.fromString(k),
+                                        v1,
+                                        v2)));
+                expectedMap.put(
+                        String.format("%d, %s, %s", pt1, pt2, k), 
String.format("%d, %d", v1, v2));
+            }
+            testRecords.add(round);
+            expected.add(
+                    expectedMap.entrySet().stream()
+                            .map(e -> String.format("Record(%s, %s)", 
e.getKey(), e.getValue()))
+                            .collect(Collectors.toList()));
+        }
+
+        runCompatibilityTest(
+                rowType,
+                Arrays.asList("pt1", "pt2"),
+                Arrays.asList("pt1", "pt2", "k"),
+                testRecords,
+                expected,
+                Record::toString);
+    }
+
+    private void runCompatibilityTest(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<List<TestRecord>> testRecords,
+            List<List<String>> expected,
+            Function<Record, String> icebergRecordToString)
+            throws Exception {
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        partitionKeys,
+                        primaryKeys,
+                        primaryKeys.isEmpty() ? -1 : 2,
+                        randomFormat(),
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        for (int r = 0; r < testRecords.size(); r++) {
+            List<TestRecord> round = testRecords.get(r);
+            for (TestRecord testRecord : round) {
+                write.write(testRecord.record);
+            }
+
+            if (!primaryKeys.isEmpty()) {
+                for (BinaryRow partition :
+                        round.stream().map(t -> 
t.partition).collect(Collectors.toSet())) {
+                    for (int b = 0; b < 2; b++) {
+                        write.compact(partition, b, true);
+                    }
+                }
+            }
+            commit.commit(r, write.prepareCommit(true, r));
+
+            assertThat(
+                            getIcebergResult(
+                                    icebergTable -> 
IcebergGenerics.read(icebergTable).build(),
+                                    icebergRecordToString))
+                    .hasSameElementsAs(expected.get(r));
+        }
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSchemaAndPropertiesChange() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        randomFormat(),
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+        assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 
10)", "Record(2, 20)");
+
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        // change1: add a column
+        // change2: change 'metadata.iceberg.delete-after-commit.enabled' to 
false
+        // change3: change 'metadata.iceberg.previous-versions-max' to 10
+        schemaManager.commitChanges(
+                SchemaChange.addColumn("v2", DataTypes.STRING()),
+                
SchemaChange.setOption(IcebergOptions.METADATA_DELETE_AFTER_COMMIT.key(), 
"false"),
+                
SchemaChange.setOption(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX.key(), 
"10"));
+        table = table.copy(table.schemaManager().latest().get());
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 11, BinaryString.fromString("one")));
+        write.write(GenericRow.of(3, 30, BinaryString.fromString("three")));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11, one)", "Record(2, 20, null)", 
"Record(3, 30, three)");
+
+        write.write(GenericRow.of(2, 21, BinaryString.fromString("two")));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11, one)", "Record(2, 21, two)", "Record(3, 
30, three)");
+
+        Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", 
"t"));
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(5);
+        // 1 metadata for createTable + 4 history metadata
+        assertThat(((BaseTable) 
icebergTable).operations().current().previousFiles().size())
+                .isEqualTo(5);
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSchemaChangeBeforeSync() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        Map<String, String> options = new HashMap<>();
+        options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+        // disable iceberg compatibility
+        FileStoreTable table =
+                createPaimonTable(
+                                rowType,
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                1,
+                                randomFormat(),
+                                Collections.emptyMap())
+                        .copy(options);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // schema change
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.STRING()));
+        table = table.copyWithLatestSchema();
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 11, BinaryString.fromString("one")));
+        write.write(GenericRow.of(3, 30, BinaryString.fromString("three")));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // enable iceberg compatibility
+        options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), 
"rest-catalog");
+        table = table.copy(options);
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(4, 40, BinaryString.fromString("four")));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11, one)",
+                        "Record(2, 20, null)",
+                        "Record(3, 30, three)",
+                        "Record(4, 40, four)");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testIcebergSnapshotExpire() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        randomFormat(),
+                        options);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(1L);
+        FileIO fileIO = table.fileIO();
+        IcebergMetadata metadata =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v1.metadata.json"));
+        Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", 
"t"));
+        assertThat(metadata.snapshots()).hasSize(1);
+        assertThat(metadata.currentSnapshotId()).isEqualTo(1);
+        // check table in rest-catalog
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(1);
+        
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(1);
+
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 30));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(3L);
+        icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+        metadata =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v3.metadata.json"));
+        assertThat(metadata.snapshots()).hasSize(3);
+        assertThat(metadata.currentSnapshotId()).isEqualTo(3);
+        // check table in rest-catalog
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(3);
+        
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(3);
+
+        // Number of snapshots will become 5 with the next commit, however 
only 3 Iceberg snapshots
+        // are kept. So the first 2 Iceberg snapshots will be expired.
+
+        write.write(GenericRow.of(2, 21));
+        write.write(GenericRow.of(3, 31));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+        metadata =
+                IcebergMetadata.fromPath(
+                        fileIO, new Path(catalogTableMetadataPath(table), 
"v5.metadata.json"));
+        icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+        assertThat(metadata.snapshots()).hasSize(3);
+        assertThat(metadata.currentSnapshotId()).isEqualTo(5);
+        // check table in rest-catalog
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(5);
+        
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(3);
+
+        write.close();
+        commit.close();
+
+        // The old metadata.json is removed when the new metadata.json is 
created
+        // depending on the old metadata retention configuration.
+        assertThat(((BaseTable) 
icebergTable).operations().current().previousFiles().size())
+                .isEqualTo(1);
+
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 21)", 
"Record(3, 31)");
+        assertThat(
+                        getIcebergResult(
+                                t -> 
IcebergGenerics.read(t).useSnapshot(3).build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", 
"Record(3, 30)");
+        assertThat(
+                        getIcebergResult(
+                                t -> 
IcebergGenerics.read(t).useSnapshot(4).build(),
+                                Record::toString))
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", 
"Record(3, 30)");
+    }
+
+    @Test
+    public void testWithIncorrectBase() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        randomFormat(),
+                        Collections.emptyMap());
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 30));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(2, write.prepareCommit(true, 2));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", 
"Record(3, 30)");
+        Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", 
"t"));
+        // generate 3 metadata files in iceberg table, and current snapshot id 
is 3
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(3);
+
+        // disable iceberg compatibility
+        Map<String, String> options = new HashMap<>();
+        options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "disabled");
+        table = table.copy(options);
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(4, 40));
+        write.write(GenericRow.of(5, 50));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L);
+
+        // enable iceberg compatibility
+        options.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), 
"rest-catalog");
+        table = table.copy(options);
+        write.close();
+        write = table.newWrite(commitUser);
+        commit.close();
+        commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(6, 60));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(4, write.prepareCommit(true, 4));
+        assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(7L);
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11)",
+                        "Record(2, 20)",
+                        "Record(3, 30)",
+                        "Record(4, 40)",
+                        "Record(5, 50)",
+                        "Record(6, 60)");
+        icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
+        assertThat(icebergTable.currentSnapshot().snapshotId()).isEqualTo(7);
+        
assertThat(ImmutableList.copyOf(icebergTable.snapshots()).size()).isEqualTo(2);
+
+        write.write(GenericRow.of(4, 41));
+        write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit.commit(5, write.prepareCommit(true, 5));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder(
+                        "Record(1, 11)",
+                        "Record(2, 20)",
+                        "Record(3, 30)",
+                        "Record(4, 41)",
+                        "Record(5, 50)",
+                        "Record(6, 60)");
+
+        write.close();
+        commit.close();
+    }
+
+    private static class TestRecord {
+        private final BinaryRow partition;
+        private final GenericRow record;
+
+        private TestRecord(BinaryRow partition, GenericRow record) {
+            this.partition = partition;
+            this.record = record;
+        }
+    }
+
+    private FileStoreTable createPaimonTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            int numBuckets,
+            String fileFormat,
+            Map<String, String> customOptions)
+            throws Exception {
+        LocalFileIO fileIO = LocalFileIO.create();
+        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
+
+        Options options = new Options(customOptions);
+        options.set(CoreOptions.BUCKET, numBuckets);
+        options.set(
+                IcebergOptions.METADATA_ICEBERG_STORAGE, 
IcebergOptions.StorageType.REST_CATALOG);
+        options.set(CoreOptions.FILE_FORMAT, fileFormat);
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+        options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
+        options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+        options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+        options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
+        options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, 
MemorySize.ofKibiBytes(8));
+
+        // rest-catalog options
+        options.set(
+                IcebergOptions.REST_CONFIG_PREFIX + CatalogProperties.URI,
+                restCatalog.properties().get(CatalogProperties.URI));
+        options.set(
+                IcebergOptions.REST_CONFIG_PREFIX + 
CatalogProperties.WAREHOUSE_LOCATION,
+                
restCatalog.properties().get(CatalogProperties.WAREHOUSE_LOCATION));
+        options.set(
+                IcebergOptions.REST_CONFIG_PREFIX + 
CatalogProperties.CLIENT_POOL_SIZE,
+                
restCatalog.properties().get(CatalogProperties.CLIENT_POOL_SIZE));
+
+        org.apache.paimon.schema.Schema schema =
+                new org.apache.paimon.schema.Schema(
+                        rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), "");
+
+        try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, 
path)) {
+            paimonCatalog.createDatabase("mydb", false);
+            Identifier paimonIdentifier = Identifier.create("mydb", "t");
+            paimonCatalog.createTable(paimonIdentifier, schema, false);
+            return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+        }
+    }
+
+    private List<String> getIcebergResult() throws Exception {
+        return getIcebergResult(
+                icebergTable -> IcebergGenerics.read(icebergTable).build(), 
Record::toString);
+    }
+
+    private List<String> getIcebergResult(
+            Function<Table, CloseableIterable<Record>> query,
+            Function<Record, String> icebergRecordToString)
+            throws Exception {
+        Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", 
"t"));
+        CloseableIterable<Record> result = query.apply(icebergTable);
+        List<String> actual = new ArrayList<>();
+        for (Record record : result) {
+            actual.add(icebergRecordToString.apply(record));
+        }
+        result.close();
+        return actual;
+    }
+
+    private String randomFormat() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int i = random.nextInt(3);
+        String[] formats = new String[] {"orc", "parquet", "avro"};
+        return formats[i];
+    }
+}

Reply via email to