This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new efe28410fd [iceberg] Introduce feature to migrate table from iceberg 
to paimon (#4639)
efe28410fd is described below

commit efe28410fd0ccc17300093f4d832b0d019dc8594
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jan 9 18:35:00 2025 +0800

    [iceberg] Introduce feature to migrate table from iceberg to paimon (#4639)
    
    This closes #4639.
---
 paimon-core/pom.xml                                |   7 +
 .../apache/paimon/iceberg/IcebergPathFactory.java  |   4 +
 .../manifest/IcebergManifestEntrySerializer.java   |  28 +
 .../iceberg/manifest/IcebergManifestFile.java      |  50 ++
 .../paimon/iceberg/metadata/IcebergDataField.java  |  79 ++-
 .../paimon/iceberg/metadata/IcebergMetadata.java   |   8 +-
 .../migrate/IcebergMigrateHadoopMetadata.java      | 110 ++++
 .../IcebergMigrateHadoopMetadataFactory.java       |  39 ++
 .../iceberg/migrate/IcebergMigrateMetadata.java    |  34 ++
 .../migrate/IcebergMigrateMetadataFactory.java     |  31 ++
 .../paimon/iceberg/migrate/IcebergMigrator.java    | 396 +++++++++++++
 .../org/apache/paimon/migrate/FileMetaUtils.java   |   6 +-
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../paimon/iceberg/migrate/IcebergMigrateTest.java | 619 +++++++++++++++++++++
 14 files changed, 1404 insertions(+), 8 deletions(-)

diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 6e9dfa716a..6cdb9a9c93 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -227,6 +227,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
index 74d2e8e48f..9025dbe87a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
@@ -68,6 +68,10 @@ public class IcebergPathFactory {
         return new Path(metadataDirectory(), 
String.format("v%d.metadata.json", snapshotId));
     }
 
+    public Path toMetadataPath(String metadataName) {
+        return new Path(metadataDirectory(), metadataName);
+    }
+
     public Stream<Path> getAllMetadataPathBefore(FileIO fileIO, long 
snapshotId)
             throws IOException {
         return FileUtils.listVersionedFileStatus(fileIO, metadataDirectory, 
"v")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
index d93456c3fe..b9d2c271b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -54,4 +54,32 @@ public class IcebergManifestEntrySerializer extends 
ObjectSerializer<IcebergMani
                 row.getLong(3),
                 fileSerializer.fromRow(row.getRow(4, 
fileSerializer.numFields())));
     }
+
+    public IcebergManifestEntry fromRow(InternalRow row, 
IcebergManifestFileMeta meta) {
+        IcebergManifestEntry.Status status = 
IcebergManifestEntry.Status.fromId(row.getInt(0));
+        long snapshotId = row.isNullAt(1) ? meta.addedSnapshotId() : 
row.getLong(1);
+        long sequenceNumber = getOrInherit(row, meta, 2, status);
+        long fileSequenceNumber = getOrInherit(row, meta, 3, status);
+
+        return new IcebergManifestEntry(
+                status,
+                snapshotId,
+                sequenceNumber,
+                fileSequenceNumber,
+                fileSerializer.fromRow(row.getRow(4, 
fileSerializer.numFields())));
+    }
+
+    private long getOrInherit(
+            InternalRow row,
+            IcebergManifestFileMeta meta,
+            int pos,
+            IcebergManifestEntry.Status status) {
+        long sequenceNumber = meta.sequenceNumber();
+        if (row.isNullAt(pos)
+                && (sequenceNumber == 0 || status == 
IcebergManifestEntry.Status.ADDED)) {
+            return sequenceNumber;
+        } else {
+            return row.getLong(pos);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 5955da6220..4553a1c850 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.iceberg.manifest;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
@@ -38,9 +39,14 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.PathFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -109,6 +115,50 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
                 table.coreOptions().manifestTargetSize());
     }
 
+    public List<IcebergManifestEntry> read(IcebergManifestFileMeta meta) {
+        return read(meta, null);
+    }
+
+    public List<IcebergManifestEntry> read(IcebergManifestFileMeta meta, 
@Nullable Long fileSize) {
+        String fileName = new Path(meta.manifestPath()).getName();
+        try {
+            Path path = pathFactory.toPath(fileName);
+
+            return readFromIterator(
+                    meta,
+                    createIterator(path, fileSize),
+                    (IcebergManifestEntrySerializer) serializer,
+                    Filter.alwaysTrue());
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read " + fileName, e);
+        }
+    }
+
+    private CloseableIterator<InternalRow> createIterator(Path file, @Nullable 
Long fileSize)
+            throws IOException {
+        return FileUtils.createFormatReader(fileIO, readerFactory, file, 
fileSize)
+                .toCloseableIterator();
+    }
+
+    private static List<IcebergManifestEntry> readFromIterator(
+            IcebergManifestFileMeta meta,
+            CloseableIterator<InternalRow> inputIterator,
+            IcebergManifestEntrySerializer serializer,
+            Filter<InternalRow> readFilter) {
+        try (CloseableIterator<InternalRow> iterator = inputIterator) {
+            List<IcebergManifestEntry> result = new ArrayList<>();
+            while (iterator.hasNext()) {
+                InternalRow row = iterator.next();
+                if (readFilter.test(row)) {
+                    result.add(serializer.fromRow(row, meta));
+                }
+            }
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public List<IcebergManifestFileMeta> rollingWrite(
             Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
         RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
writer =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
index 4ecc77a135..a310e64f64 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -20,13 +20,22 @@ package org.apache.paimon.iceberg.metadata;
 
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.Preconditions;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -64,7 +73,7 @@ public class IcebergDataField {
     @JsonProperty(FIELD_TYPE)
     private final Object type;
 
-    @JsonIgnore private final DataType dataType;
+    @JsonIgnore private DataType dataType;
 
     @JsonProperty(FIELD_DOC)
     private final String doc;
@@ -126,6 +135,10 @@ public class IcebergDataField {
 
     @JsonIgnore
     public DataType dataType() {
+        if (dataType != null) {
+            return dataType;
+        }
+        dataType = getDataTypeFromType();
         return Preconditions.checkNotNull(dataType);
     }
 
@@ -190,6 +203,70 @@ public class IcebergDataField {
         }
     }
 
+    private DataType getDataTypeFromType() {
+        String simpleType = type.toString();
+        String delimiter = "(";
+        if (simpleType.contains("[")) {
+            delimiter = "[";
+        }
+        String typePrefix =
+                !simpleType.contains(delimiter)
+                        ? simpleType
+                        : simpleType.substring(0, 
simpleType.indexOf(delimiter));
+        switch (typePrefix) {
+            case "boolean":
+                return new BooleanType(!required);
+            case "int":
+                return new IntType(!required);
+            case "long":
+                return new BigIntType(!required);
+            case "float":
+                return new FloatType(!required);
+            case "double":
+                return new DoubleType(!required);
+            case "date":
+                return new DateType(!required);
+            case "string":
+                return new VarCharType(!required, VarCharType.MAX_LENGTH);
+            case "binary":
+                return new VarBinaryType(!required, VarBinaryType.MAX_LENGTH);
+            case "fixed":
+                int fixedLength =
+                        Integer.parseInt(
+                                simpleType.substring(
+                                        simpleType.indexOf("[") + 1, 
simpleType.indexOf("]")));
+                return new BinaryType(!required, fixedLength);
+            case "uuid":
+                // https://iceberg.apache.org/spec/?h=vector#primitive-types
+                // uuid should use 16-byte fixed
+                return new BinaryType(!required, 16);
+            case "decimal":
+                int precision =
+                        Integer.parseInt(
+                                simpleType.substring(
+                                        simpleType.indexOf("(") + 1, 
simpleType.indexOf(",")));
+                int scale =
+                        Integer.parseInt(
+                                simpleType.substring(
+                                        simpleType.indexOf(",") + 2, 
simpleType.indexOf(")")));
+                return new DecimalType(!required, precision, scale);
+            case "timestamp":
+                return new TimestampType(!required, 6);
+            case "timestamptz":
+                return new LocalZonedTimestampType(!required, 6);
+            case "timestamp_ns": // iceberg v3 format
+                return new TimestampType(!required, 9);
+            case "timestamptz_ns": // iceberg v3 format
+                return new LocalZonedTimestampType(!required, 9);
+            default:
+                throw new UnsupportedOperationException("Unsupported data 
type: " + type);
+        }
+    }
+
+    public DataField toDatafield() {
+        return new DataField(id, name, dataType(), doc);
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(id, name, required, type, doc);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index 86fb4a5df7..fbaf806002 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -106,7 +106,7 @@ public class IcebergMetadata {
     private final List<IcebergSnapshot> snapshots;
 
     @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
-    private final int currentSnapshotId;
+    private final long currentSnapshotId;
 
     @JsonProperty(FIELD_PROPERTIES)
     @Nullable
@@ -122,7 +122,7 @@ public class IcebergMetadata {
             List<IcebergPartitionSpec> partitionSpecs,
             int lastPartitionId,
             List<IcebergSnapshot> snapshots,
-            int currentSnapshotId) {
+            long currentSnapshotId) {
         this(
                 CURRENT_FORMAT_VERSION,
                 tableUuid,
@@ -158,7 +158,7 @@ public class IcebergMetadata {
             @JsonProperty(FIELD_SORT_ORDERS) List<IcebergSortOrder> sortOrders,
             @JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
             @JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
-            @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId,
+            @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
             @JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> 
properties) {
         this.formatVersion = formatVersion;
         this.tableUuid = tableUuid;
@@ -249,7 +249,7 @@ public class IcebergMetadata {
     }
 
     @JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
-    public int currentSnapshotId() {
+    public long currentSnapshotId() {
         return currentSnapshotId;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
new file mode 100644
index 0000000000..a6c5fb027b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergPathFactory;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** Get iceberg table latest snapshot metadata in hadoop. */
+public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMigrateHadoopMetadata.class);
+
+    private static final String VERSION_HINT_FILENAME = "version-hint.text";
+    private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";
+
+    private final FileIO fileIO;
+    private final Identifier icebergIdentifier;
+    private final Options icebergOptions;
+
+    private Path icebergLatestMetaVersionPath;
+    private IcebergPathFactory icebergMetaPathFactory;
+
+    public IcebergMigrateHadoopMetadata(
+            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions) {
+        this.fileIO = fileIO;
+        this.icebergIdentifier = icebergIdentifier;
+        this.icebergOptions = icebergOptions;
+    }
+
+    @Override
+    public IcebergMetadata icebergMetadata() {
+        Preconditions.checkArgument(
+                icebergOptions.get(ICEBERG_WAREHOUSE) != null,
+                "'iceberg_warehouse' is null. "
+                        + "In hadoop-catalog, you should explicitly set this 
argument for finding iceberg metadata.");
+        this.icebergMetaPathFactory =
+                new IcebergPathFactory(
+                        new Path(
+                                icebergOptions.get(ICEBERG_WAREHOUSE),
+                                new Path(
+                                        String.format(
+                                                "%s/%s/metadata",
+                                                
icebergIdentifier.getDatabaseName(),
+                                                
icebergIdentifier.getTableName()))));
+        long icebergLatestMetaVersion = getIcebergLatestMetaVersion();
+
+        this.icebergLatestMetaVersionPath =
+                
icebergMetaPathFactory.toMetadataPath(icebergLatestMetaVersion);
+        LOG.info(
+                "iceberg latest snapshot metadata file location: {}", 
icebergLatestMetaVersionPath);
+
+        return IcebergMetadata.fromPath(fileIO, icebergLatestMetaVersionPath);
+    }
+
+    @Override
+    public String icebergLatestMetadataLocation() {
+        return icebergLatestMetaVersionPath.toString();
+    }
+
+    @Override
+    public void deleteOriginTable() {
+        Path tablePath = 
icebergMetaPathFactory.metadataDirectory().getParent();
+        LOG.info("Iceberg table path to be deleted:{}", tablePath);
+        try {
+            if (fileIO.isDir(tablePath)) {
+                fileIO.deleteDirectoryQuietly(tablePath);
+            }
+        } catch (IOException e) {
+            LOG.warn("exception occurred when deleting origin table.", e);
+        }
+    }
+
+    private long getIcebergLatestMetaVersion() {
+        Path versionHintPath =
+                new Path(icebergMetaPathFactory.metadataDirectory(), 
VERSION_HINT_FILENAME);
+        try {
+            return Integer.parseInt(fileIO.readFileUtf8(versionHintPath));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "read iceberg version-hint.text failed. Iceberg metadata 
path: "
+                            + icebergMetaPathFactory.metadataDirectory(),
+                    e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
new file mode 100644
index 0000000000..6666301014
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHadoopMetadata}. */
+public class IcebergMigrateHadoopMetadataFactory implements 
IcebergMigrateMetadataFactory {
+
+    @Override
+    public String identifier() {
+        return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + 
"_migrate";
+    }
+
+    @Override
+    public IcebergMigrateHadoopMetadata create(
+            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions) {
+        return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, 
icebergOptions);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java
new file mode 100644
index 0000000000..58648c537f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+
+/**
+ * Get iceberg metadata for migrate. Each kind of iceberg catalog should have 
its own
+ * implementation.
+ */
+public interface IcebergMigrateMetadata {
+
+    IcebergMetadata icebergMetadata();
+
+    String icebergLatestMetadataLocation();
+
+    void deleteOriginTable();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
new file mode 100644
index 0000000000..f727088f5d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateMetadata}. */
+public interface IcebergMigrateMetadataFactory extends Factory {
+
+    IcebergMigrateMetadata create(
+            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
new file mode 100644
index 0000000000..44162dea7f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.factories.FactoryException;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.IcebergPathFactory;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergDataField;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+
+/** migrate iceberg table to paimon table. */
+public class IcebergMigrator implements Migrator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMigrator.class);
+
+    private final ThreadPoolExecutor executor;
+
+    private final Catalog paimonCatalog;
+    private final FileIO paimonFileIO;
+    private final String paimonDatabaseName;
+    private final String paimonTableName;
+
+    private final String icebergDatabaseName;
+    private final String icebergTableName;
+    private final Options icebergOptions;
+
+    private final IcebergMigrateMetadata icebergMigrateMetadata;
+    // metadata path factory for iceberg metadata
+    private final IcebergPathFactory icebergMetaPathFactory;
+    // latest metadata file path
+    private final String icebergLatestMetadataLocation;
+    // metadata for newest iceberg snapshot
+    private final IcebergMetadata icebergMetadata;
+
+    private Boolean deleteOriginTable = true;
+
+    public IcebergMigrator(
+            Catalog paimonCatalog,
+            String paimonDatabaseName,
+            String paimonTableName,
+            String icebergDatabaseName,
+            String icebergTableName,
+            Options icebergOptions,
+            Integer parallelism) {
+        this.paimonCatalog = paimonCatalog;
+        this.paimonFileIO = paimonCatalog.fileIO();
+        this.paimonDatabaseName = paimonDatabaseName;
+        this.paimonTableName = paimonTableName;
+
+        this.icebergDatabaseName = icebergDatabaseName;
+        this.icebergTableName = icebergTableName;
+        this.icebergOptions = icebergOptions;
+
+        Preconditions.checkArgument(
+                
icebergOptions.containsKey(IcebergOptions.METADATA_ICEBERG_STORAGE.key()),
+                "'metadata.iceberg.storage' is required, please make sure it 
has been set.");
+
+        IcebergMigrateMetadataFactory icebergMigrateMetadataFactory;
+        try {
+            icebergMigrateMetadataFactory =
+                    FactoryUtil.discoverFactory(
+                            IcebergMigrator.class.getClassLoader(),
+                            IcebergMigrateMetadataFactory.class,
+                            
icebergOptions.get(IcebergOptions.METADATA_ICEBERG_STORAGE).toString()
+                                    + "_migrate");
+        } catch (FactoryException e) {
+            throw new RuntimeException("create IcebergMigrateMetadataFactory 
failed.", e);
+        }
+
+        icebergMigrateMetadata =
+                icebergMigrateMetadataFactory.create(
+                        Identifier.create(icebergDatabaseName, 
icebergTableName),
+                        paimonFileIO,
+                        icebergOptions);
+
+        this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
+        this.icebergLatestMetadataLocation = 
icebergMigrateMetadata.icebergLatestMetadataLocation();
+        this.icebergMetaPathFactory =
+                new IcebergPathFactory(new 
Path(icebergLatestMetadataLocation).getParent());
+
+        this.executor = createCachedThreadPool(parallelism, 
"ICEBERG_MIGRATOR");
+    }
+
+    @Override
+    public void executeMigrate() throws Exception {
+        Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
+        Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, 
paimonTableName);
+
+        paimonCatalog.createDatabase(paimonDatabaseName, true);
+        paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
+
+        try {
+            FileStoreTable paimonTable = (FileStoreTable) 
paimonCatalog.getTable(paimonIdentifier);
+
+            IcebergManifestFile manifestFile =
+                    IcebergManifestFile.create(paimonTable, 
icebergMetaPathFactory);
+            IcebergManifestList manifestList =
+                    IcebergManifestList.create(paimonTable, 
icebergMetaPathFactory);
+
+            List<IcebergManifestFileMeta> icebergManifestFileMetas =
+                    
manifestList.read(icebergMetadata.currentSnapshot().manifestList());
+
+            // check manifest file with 'DELETE' kind
+            checkAndFilterManifestFiles(icebergManifestFileMetas);
+
+            // get all live iceberg entries
+            List<IcebergManifestEntry> icebergEntries =
+                    icebergManifestFileMetas.stream()
+                            .flatMap(fileMeta -> 
manifestFile.read(fileMeta).stream())
+                            .filter(IcebergManifestEntry::isLive)
+                            .collect(Collectors.toList());
+            if (icebergEntries.isEmpty()) {
+                LOG.info(
+                        "No live manifest entry in iceberg table for snapshot 
{}, iceberg table meta path is {}.",
+                        icebergMetadata.currentSnapshotId(),
+                        icebergLatestMetadataLocation);
+                return;
+            }
+
+            List<IcebergDataFileMeta> icebergDataFileMetas =
+                    icebergEntries.stream()
+                            .map(IcebergManifestEntry::file)
+                            .collect(Collectors.toList());
+
+            // Again, check if delete File exists
+            checkAndFilterDataFiles(icebergDataFileMetas);
+
+            LOG.info(
+                    "Begin to create Migrate Task, the number of iceberg data 
files is {}",
+                    icebergDataFileMetas.size());
+
+            List<MigrateTask> tasks = new ArrayList<>();
+            Map<Path, Path> rollback = new ConcurrentHashMap<>();
+            if (paimonTable.partitionKeys().isEmpty()) {
+                tasks.add(importUnPartitionedTable(icebergDataFileMetas, 
paimonTable, rollback));
+            } else {
+                tasks.addAll(importPartitionedTable(icebergDataFileMetas, 
paimonTable, rollback));
+            }
+
+            List<Future<CommitMessage>> futures =
+                    
tasks.stream().map(executor::submit).collect(Collectors.toList());
+            List<CommitMessage> commitMessages = new ArrayList<>();
+            try {
+                for (Future<CommitMessage> future : futures) {
+                    commitMessages.add(future.get());
+                }
+            } catch (Exception e) {
+                futures.forEach(f -> f.cancel(true));
+                for (Future<?> future : futures) {
+                    // wait all task cancelled or finished
+                    while (!future.isDone()) {
+                        //noinspection BusyWait
+                        Thread.sleep(100);
+                    }
+                }
+                // roll back all renamed path
+                for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
+                    Path newPath = entry.getKey();
+                    Path origin = entry.getValue();
+                    if (paimonFileIO.exists(newPath)) {
+                        paimonFileIO.rename(newPath, origin);
+                    }
+                }
+
+                throw new RuntimeException("Migrating failed because exception 
happens", e);
+            }
+            try (BatchTableCommit commit = 
paimonTable.newBatchWriteBuilder().newCommit()) {
+                commit.commit(new ArrayList<>(commitMessages));
+                LOG.info("paimon commit success! Iceberg data files have been 
migrated to paimon.");
+            }
+        } catch (Exception e) {
+            paimonCatalog.dropTable(paimonIdentifier, true);
+            throw new RuntimeException("Migrating failed", e);
+        }
+
+        // if all success, drop the origin table according the delete field
+        if (deleteOriginTable) {
+            icebergMigrateMetadata.deleteOriginTable();
+        }
+    }
+
+    @Override
+    public void deleteOriginTable(boolean delete) throws Exception {
+        this.deleteOriginTable = delete;
+    }
+
+    @Override
+    public void renameTable(boolean ignoreIfNotExists) throws Exception {
+        Identifier targetTableId = Identifier.create(paimonDatabaseName, 
paimonTableName);
+        Identifier sourceTableId = Identifier.create(icebergDatabaseName, 
icebergTableName);
+        LOG.info("Last step: rename {} to {}.", targetTableId, sourceTableId);
+        paimonCatalog.renameTable(targetTableId, sourceTableId, 
ignoreIfNotExists);
+    }
+
+    public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) 
{
+        // get iceberg current schema
+        IcebergSchema icebergSchema =
+                
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+
+        // get iceberg current partition spec
+        int currentPartitionSpecId = icebergMetadata.defaultSpecId();
+        IcebergPartitionSpec currentIcebergPartitionSpec =
+                icebergMetadata.partitionSpecs().get(currentPartitionSpecId);
+
+        List<DataField> dataFields =
+                icebergSchema.fields().stream()
+                        .map(IcebergDataField::toDatafield)
+                        .collect(Collectors.toList());
+
+        List<String> partitionKeys =
+                currentIcebergPartitionSpec.fields().stream()
+                        .map(IcebergPartitionField::name)
+                        .collect(Collectors.toList());
+
+        return new Schema(
+                dataFields, partitionKeys, Collections.emptyList(), 
Collections.emptyMap(), null);
+    }
+
+    private void checkAndFilterManifestFiles(
+            List<IcebergManifestFileMeta> icebergManifestFileMetas) {
+
+        for (IcebergManifestFileMeta meta : icebergManifestFileMetas) {
+            Preconditions.checkArgument(
+                    meta.content() != IcebergManifestFileMeta.Content.DELETES,
+                    "IcebergMigrator don't support analyzing manifest file 
with 'DELETE' content.");
+        }
+    }
+
+    private void checkAndFilterDataFiles(List<IcebergDataFileMeta> 
icebergDataFileMetas) {
+
+        for (IcebergDataFileMeta meta : icebergDataFileMetas) {
+            Preconditions.checkArgument(
+                    meta.content() == IcebergDataFileMeta.Content.DATA,
+                    "IcebergMigrator don't support analyzing iceberg delete 
file.");
+        }
+    }
+
+    private static List<DataFileMeta> construct(
+            List<IcebergDataFileMeta> icebergDataFileMetas,
+            FileIO fileIO,
+            Table paimonTable,
+            Path newDir,
+            Map<Path, Path> rollback) {
+        return icebergDataFileMetas.stream()
+                .map(
+                        icebergDataFileMeta ->
+                                constructFileMeta(
+                                        icebergDataFileMeta, fileIO, 
paimonTable, newDir, rollback))
+                .collect(Collectors.toList());
+    }
+
+    private static DataFileMeta constructFileMeta(
+            IcebergDataFileMeta icebergDataFileMeta,
+            FileIO fileIO,
+            Table table,
+            Path dir,
+            Map<Path, Path> rollback) {
+        FileStatus status;
+        try {
+            status = fileIO.getFileStatus(new 
Path(icebergDataFileMeta.filePath()));
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "error when get file status. file path is " + 
icebergDataFileMeta.filePath(),
+                    e);
+        }
+        String format = icebergDataFileMeta.fileFormat();
+        return FileMetaUtils.constructFileMeta(format, status, fileIO, table, 
dir, rollback);
+    }
+
+    private MigrateTask importUnPartitionedTable(
+            List<IcebergDataFileMeta> icebergDataFileMetas,
+            FileStoreTable paimonTable,
+            Map<Path, Path> rollback) {
+        BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
+        Path newDir = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+
+        return new MigrateTask(
+                icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, 
newDir, rollback);
+    }
+
+    private List<MigrateTask> importPartitionedTable(
+            List<IcebergDataFileMeta> icebergDataFileMetas,
+            FileStoreTable paimonTable,
+            Map<Path, Path> rollback) {
+        Map<BinaryRow, List<IcebergDataFileMeta>> dataInPartition =
+                icebergDataFileMetas.stream()
+                        
.collect(Collectors.groupingBy(IcebergDataFileMeta::partition));
+        List<MigrateTask> migrateTasks = new ArrayList<>();
+        for (Map.Entry<BinaryRow, List<IcebergDataFileMeta>> entry : 
dataInPartition.entrySet()) {
+            BinaryRow partitionRow = entry.getKey();
+            Path newDir = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+            migrateTasks.add(
+                    new MigrateTask(
+                            entry.getValue(),
+                            paimonFileIO,
+                            paimonTable,
+                            partitionRow,
+                            newDir,
+                            rollback));
+        }
+        return migrateTasks;
+    }
+
+    /** One import task for one partition. */
+    public static class MigrateTask implements Callable<CommitMessage> {
+
+        private final List<IcebergDataFileMeta> icebergDataFileMetas;
+        private final FileIO fileIO;
+        private final FileStoreTable paimonTable;
+        private final BinaryRow partitionRow;
+        private final Path newDir;
+        private final Map<Path, Path> rollback;
+
+        public MigrateTask(
+                List<IcebergDataFileMeta> icebergDataFileMetas,
+                FileIO fileIO,
+                FileStoreTable paimonTable,
+                BinaryRow partitionRow,
+                Path newDir,
+                Map<Path, Path> rollback) {
+            this.icebergDataFileMetas = icebergDataFileMetas;
+            this.fileIO = fileIO;
+            this.paimonTable = paimonTable;
+            this.partitionRow = partitionRow;
+            this.newDir = newDir;
+            this.rollback = rollback;
+        }
+
+        @Override
+        public CommitMessage call() throws Exception {
+            if (!fileIO.exists(newDir)) {
+                fileIO.mkdirs(newDir);
+            }
+            List<DataFileMeta> fileMetas =
+                    construct(icebergDataFileMetas, fileIO, paimonTable, 
newDir, rollback);
+            return FileMetaUtils.commitFile(partitionRow, fileMetas);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 51a0b5e2a9..366f8afcfd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -92,9 +92,7 @@ public class FileMetaUtils {
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
     }
 
-    // -----------------------------private 
method---------------------------------------------
-
-    private static DataFileMeta constructFileMeta(
+    public static DataFileMeta constructFileMeta(
             String format,
             FileStatus fileStatus,
             FileIO fileIO,
@@ -131,6 +129,8 @@ public class FileMetaUtils {
         }
     }
 
+    // -----------------------------private 
method---------------------------------------------
+
     private static Path renameFile(
             FileIO fileIO, Path originPath, Path newDir, String format, 
Map<Path, Path> rollback)
             throws IOException {
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6416edd720..ff423bffd8 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -39,3 +39,4 @@ 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory
 org.apache.paimon.rest.RESTCatalogFactory
 org.apache.paimon.rest.auth.BearTokenCredentialsProviderFactory
 org.apache.paimon.rest.auth.BearTokenFileCredentialsProviderFactory
+org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
new file mode 100644
index 0000000000..aadaca0c38
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link IcebergMigrator}. */
+public class IcebergMigrateTest {
+    @TempDir java.nio.file.Path iceTempDir;
+    @TempDir java.nio.file.Path paiTempDir;
+
+    Catalog paiCatalog;
+
+    String iceDatabase = "ice_db";
+    String iceTable = "ice_t";
+
+    String paiDatabase = "pai_db";
+    String paiTable = "pai_t";
+
+    Schema iceSchema =
+            new Schema(
+                    Types.NestedField.required(1, "k", 
Types.IntegerType.get()),
+                    Types.NestedField.required(2, "v", 
Types.IntegerType.get()),
+                    Types.NestedField.required(3, "dt", 
Types.StringType.get()),
+                    Types.NestedField.required(4, "hh", 
Types.StringType.get()));
+    Schema iceDeleteSchema =
+            new Schema(
+                    Types.NestedField.required(1, "k", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(2, "v", 
Types.IntegerType.get()));
+
+    PartitionSpec icePartitionSpec =
+            
PartitionSpec.builderFor(iceSchema).identity("dt").identity("hh").build();
+
+    Map<String, String> icebergProperties = new HashMap<>();
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        paiCatalog = createPaimonCatalog();
+        icebergProperties.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), 
"hadoop-catalog");
+        icebergProperties.put("iceberg_warehouse", iceTempDir.toString());
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        List<GenericRecord> records1 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "00"),
+                                toIcebergRecord(2, 2, "20240101", "00"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records1);
+        }
+
+        List<GenericRecord> records2 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "01"),
+                                toIcebergRecord(2, 2, "20240101", "01"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records2);
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+        icebergMigrator.executeMigrate();
+        icebergMigrator.renameTable(false);
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(iceDatabase, iceTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.concat(records1.stream(), records2.stream())
+                                .map(GenericRecord::toString)
+                                .collect(Collectors.toList()));
+
+        // verify iceberg table has been deleted
+        assertThat(paimonTable.fileIO().exists(new 
Path(icebergTable.location()))).isFalse();
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateAddAndDelete(boolean isPartitioned) throws 
Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        List<GenericRecord> records1 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "00"),
+                                toIcebergRecord(2, 2, "20240101", "00"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records1);
+        }
+
+        List<GenericRecord> records2 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "01"),
+                                toIcebergRecord(2, 2, "20240101", "01"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records2);
+        }
+
+        // the file written with records2 will be deleted and generate a 
delete manifest entry, not
+        // a delete file
+        icebergTable.newDelete().deleteFromRowFilter(Expressions.equal("hh", 
"00")).commit();
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        records2.stream()
+                                .map(GenericRecord::toString)
+                                .collect(Collectors.toList()));
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateWithDeleteFile(boolean isPartitioned) throws 
Exception {
+        // only support create delete file with parquet format
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        List<GenericRecord> records1 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "00"),
+                                toIcebergRecord(2, 2, "20240101", "00"))
+                        .collect(Collectors.toList());
+        List<GenericRecord> deleteRecords1 =
+                Collections.singletonList(toIcebergRecord(1, 1, 
iceDeleteSchema));
+
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+            writeEqualityDeleteFile(icebergTable, deleteRecords1, "20240101", 
"00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records1);
+            writeEqualityDeleteFile(icebergTable, deleteRecords1);
+        }
+
+        List<GenericRecord> records2 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "01"),
+                                toIcebergRecord(2, 2, "20240101", "01"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records2);
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+
+        assertThatThrownBy(icebergMigrator::executeMigrate)
+                .rootCause()
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        "IcebergMigrator don't support analyzing manifest file 
with 'DELETE' content.");
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws 
Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+
+        int numRounds = 50;
+        int numRecords = 20;
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<GenericRecord> expectRecords = new ArrayList<>();
+        for (int i = 0; i < numRounds; i++) {
+            List<GenericRecord> records = new ArrayList<>();
+            String dt = Integer.toString(random.nextInt(20240101, 20240104));
+            String hh = Integer.toString(random.nextInt(3));
+            for (int j = 0; j < numRecords; j++) {
+                records.add(toIcebergRecord(random.nextInt(100), 
random.nextInt(100), dt, hh));
+            }
+            expectRecords.addAll(records);
+            if (isPartitioned) {
+                writeRecordsToIceberg(icebergTable, format, records, dt, hh);
+            } else {
+                writeRecordsToIceberg(icebergTable, format, records);
+            }
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        expectRecords.stream()
+                                .map(GenericRecord::toString)
+                                .collect(Collectors.toList()));
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws 
Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+
+        // write base data
+        List<GenericRecord> records1 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "00"),
+                                toIcebergRecord(2, 2, "20240101", "00"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records1);
+        }
+
+        List<GenericRecord> records2 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "01"),
+                                toIcebergRecord(2, 2, "20240101", "01"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records2);
+        }
+
+        // TODO: currently only support schema evolution of deleting columns
+        testDeleteColumn(icebergTable, format, isPartitioned);
+    }
+
+    private void testDeleteColumn(Table icebergTable, String format, boolean 
isPartitioned)
+            throws Exception {
+        icebergTable.updateSchema().deleteColumn("v").commit();
+        Schema newIceSchema = icebergTable.schema();
+        List<GenericRecord> addedRecords =
+                Stream.of(
+                                toIcebergRecord(3, "20240101", "00", 
newIceSchema),
+                                toIcebergRecord(4, "20240101", "00", 
newIceSchema))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, addedRecords);
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.of(
+                                        "Record(1, 20240101, 00)",
+                                        "Record(2, 20240101, 00)",
+                                        "Record(1, 20240101, 01)",
+                                        "Record(2, 20240101, 01)",
+                                        "Record(3, 20240101, 00)",
+                                        "Record(4, 20240101, 00)")
+                                .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void testAllDataTypes() throws Exception {
+        Schema iceAllTypesSchema =
+                new Schema(
+                        Types.NestedField.required(1, "c1", 
Types.BooleanType.get()),
+                        Types.NestedField.required(2, "c2", 
Types.IntegerType.get()),
+                        Types.NestedField.required(3, "c3", 
Types.LongType.get()),
+                        Types.NestedField.required(4, "c4", 
Types.FloatType.get()),
+                        Types.NestedField.required(5, "c5", 
Types.DoubleType.get()),
+                        Types.NestedField.required(6, "c6", 
Types.DateType.get()),
+                        Types.NestedField.required(7, "c7", 
Types.StringType.get()),
+                        Types.NestedField.required(8, "c9", 
Types.BinaryType.get()),
+                        Types.NestedField.required(9, "c11", 
Types.DecimalType.of(10, 2)),
+                        Types.NestedField.required(10, "c13", 
Types.TimestampType.withoutZone()),
+                        Types.NestedField.required(11, "c14", 
Types.TimestampType.withZone()));
+        Table icebergTable = createIcebergTable(false, iceAllTypesSchema);
+        String format = "parquet";
+        GenericRecord record =
+                toIcebergRecord(
+                        iceAllTypesSchema,
+                        true,
+                        1,
+                        1L,
+                        1.0F,
+                        1.0D,
+                        LocalDate.of(2023, 10, 18),
+                        "test",
+                        ByteBuffer.wrap(new byte[] {1, 2, 3}),
+                        new BigDecimal("122.50"),
+                        LocalDateTime.now(),
+                        OffsetDateTime.now());
+
+        writeRecordsToIceberg(icebergTable, format, 
Collections.singletonList(record));
+
+        CatalogContext context = CatalogContext.create(new 
Path(paiTempDir.toString()));
+        context.options().set(CACHE_ENABLED, false);
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1);
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
catalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(paiResults.size()).isEqualTo(1);
+    }
+
+    private org.apache.iceberg.catalog.Catalog createIcebergCatalog() {
+        Map<String, String> icebergCatalogOptions = new HashMap<>();
+        icebergCatalogOptions.put("type", "hadoop");
+        icebergCatalogOptions.put("warehouse", iceTempDir.toString());
+
+        return CatalogUtil.buildIcebergCatalog(
+                "iceberg_catalog", icebergCatalogOptions, new Configuration());
+    }
+
+    private Catalog createPaimonCatalog() {
+        CatalogContext context = CatalogContext.create(new 
Path(paiTempDir.toString()));
+        context.options().set(CACHE_ENABLED, false);
+        return CatalogFactory.createCatalog(context);
+    }
+
+    private Table createIcebergTable(boolean isPartitioned) {
+        return createIcebergTable(isPartitioned, iceSchema);
+    }
+
+    private Table createIcebergTable(boolean isPartitioned, Schema 
icebergSchema) {
+
+        org.apache.iceberg.catalog.Catalog icebergCatalog = 
createIcebergCatalog();
+        TableIdentifier icebergIdentifier = TableIdentifier.of(iceDatabase, 
iceTable);
+
+        if (!isPartitioned) {
+            return icebergCatalog
+                    .buildTable(icebergIdentifier, icebergSchema)
+                    .withPartitionSpec(PartitionSpec.unpartitioned())
+                    .create();
+        } else {
+            return icebergCatalog
+                    .buildTable(icebergIdentifier, icebergSchema)
+                    .withPartitionSpec(icePartitionSpec)
+                    .create();
+        }
+    }
+
+    private GenericRecord toIcebergRecord(Schema icebergSchema, Object... 
values) {
+        GenericRecord record = GenericRecord.create(icebergSchema);
+        for (int i = 0; i < values.length; i++) {
+            record.set(i, values[i]);
+        }
+        return record;
+    }
+
+    private GenericRecord toIcebergRecord(Object... values) {
+        return toIcebergRecord(iceSchema, values);
+    }
+
+    private DataWriter<GenericRecord> createIcebergDataWriter(
+            Table icebergTable, String format, OutputFile file, String... 
partitionValues)
+            throws IOException {
+        Schema schema = icebergTable.schema();
+        PartitionSpec partitionSpec = icebergTable.spec();
+        PartitionKey partitionKey =
+                partitionValues.length == 0
+                        ? null
+                        : partitionKey(
+                                icePartitionSpec,
+                                icebergTable,
+                                partitionValues[0],
+                                partitionValues[1]);
+        // currently only support "parquet" format
+        switch (format) {
+            case "parquet":
+                return Parquet.writeData(file)
+                        .schema(schema)
+                        .createWriterFunc(GenericParquetWriter::buildWriter)
+                        .overwrite()
+                        .withSpec(partitionSpec)
+                        .withPartition(partitionKey)
+                        .build();
+            default:
+                throw new IllegalArgumentException("Unsupported format: " + 
format);
+        }
+    }
+
+    private void writeRecordsToIceberg(
+            Table icebergTable,
+            String format,
+            List<GenericRecord> records,
+            String... partitionValues)
+            throws IOException {
+        String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+        OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+        DataWriter<GenericRecord> dataWriter =
+                createIcebergDataWriter(icebergTable, format, file, 
partitionValues);
+        try {
+            for (GenericRecord r : records) {
+                dataWriter.write(r);
+            }
+        } finally {
+            dataWriter.close();
+        }
+        DataFile dataFile = dataWriter.toDataFile();
+        icebergTable.newAppend().appendFile(dataFile).commit();
+    }
+
+    private void writeEqualityDeleteFile(
+            Table icebergTable, List<GenericRecord> deleteRecords, String... 
partitionValues)
+            throws IOException {
+        String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+        OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+        EqualityDeleteWriter<GenericRecord> deleteWriter =
+                Parquet.writeDeletes(file)
+                        .createWriterFunc(GenericParquetWriter::buildWriter)
+                        .overwrite()
+                        .rowSchema(iceDeleteSchema)
+                        .withSpec(PartitionSpec.unpartitioned())
+                        .equalityFieldIds(1)
+                        .buildEqualityWriter();
+        if (partitionValues.length != 0) {
+            deleteWriter =
+                    Parquet.writeDeletes(file)
+                            
.createWriterFunc(GenericParquetWriter::buildWriter)
+                            .overwrite()
+                            .rowSchema(iceDeleteSchema)
+                            .withSpec(icePartitionSpec)
+                            .withPartition(
+                                    partitionKey(
+                                            icePartitionSpec,
+                                            icebergTable,
+                                            partitionValues[0],
+                                            partitionValues[1]))
+                            .equalityFieldIds(1)
+                            .buildEqualityWriter();
+        }
+
+        try (EqualityDeleteWriter<GenericRecord> closableWriter = 
deleteWriter) {
+            closableWriter.write(deleteRecords);
+        }
+
+        DeleteFile deleteFile = deleteWriter.toDeleteFile();
+        icebergTable.newRowDelta().addDeletes(deleteFile).commit();
+    }
+
+    private PartitionKey partitionKey(
+            PartitionSpec spec, Table icergTable, String... partitionValues) {
+        Record record =
+                GenericRecord.create(icergTable.schema())
+                        .copy(ImmutableMap.of("dt", partitionValues[0], "hh", 
partitionValues[1]));
+
+        PartitionKey partitionKey = new PartitionKey(spec, 
icergTable.schema());
+        partitionKey.partition(record);
+
+        return partitionKey;
+    }
+
+    private List<String> getPaimonResult(FileStoreTable paimonTable) throws 
Exception {
+        List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
+        TableRead read = paimonTable.newReadBuilder().newRead();
+        try (RecordReader<InternalRow> recordReader = 
read.createReader(splits)) {
+            List<String> result = new ArrayList<>();
+            recordReader.forEachRemaining(
+                    row ->
+                            result.add(
+                                    DataFormatTestUtil.toStringNoRowKind(
+                                            row, paimonTable.rowType())));
+            return result;
+        }
+    }
+}

Reply via email to