This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7210cf618f [iceberg] support migrating iceberg table which had 
suffered schema evolution (#5083)
7210cf618f is described below

commit 7210cf618f17036f24c111293fcdd69ba350afb1
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Feb 27 10:58:52 2025 +0800

    [iceberg] support migrating iceberg table which had suffered schema 
evolution (#5083)
    
    This closes #5083.
---
 .../iceberg/manifest/IcebergDataFileMeta.java      |  12 +
 .../paimon/iceberg/migrate/IcebergMigrator.java    | 111 +++++-
 .../org/apache/paimon/migrate/FileMetaUtils.java   |  67 +++-
 .../org/apache/paimon/schema/SchemaManager.java    |   2 +-
 .../paimon/iceberg/migrate/IcebergMigrateTest.java | 442 +++++++++++++++++----
 .../paimon/flink/utils/TableMigrationUtils.java    |   3 +-
 6 files changed, 534 insertions(+), 103 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
index d171962bec..cb78c3c646 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -83,6 +83,9 @@ public class IcebergDataFileMeta {
     private final InternalMap lowerBounds;
     private final InternalMap upperBounds;
 
+    // only used for iceberg migrate
+    private long schemaId = 0;
+
     IcebergDataFileMeta(
             Content content,
             String filePath,
@@ -201,6 +204,15 @@ public class IcebergDataFileMeta {
         return upperBounds;
     }
 
+    public long schemaId() {
+        return schemaId;
+    }
+
+    public IcebergDataFileMeta withSchemaId(long schemaId) {
+        this.schemaId = schemaId;
+        return this;
+    }
+
     public static RowType schema(RowType partitionType) {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 9e91fa2d18..4aff0ded5f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.iceberg.migrate;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
@@ -43,19 +44,26 @@ import org.apache.paimon.migrate.FileMetaUtils;
 import org.apache.paimon.migrate.Migrator;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -64,6 +72,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
 /** migrate iceberg table to paimon table. */
@@ -75,6 +84,7 @@ public class IcebergMigrator implements Migrator {
     private final Catalog paimonCatalog;
     private final String paimonDatabaseName;
     private final String paimonTableName;
+    private final CoreOptions coreOptions;
 
     private final String icebergDatabaseName;
     private final String icebergTableName;
@@ -97,10 +107,18 @@ public class IcebergMigrator implements Migrator {
             String icebergDatabaseName,
             String icebergTableName,
             Options icebergOptions,
-            Integer parallelism) {
+            Integer parallelism,
+            Map<String, String> options) {
         this.paimonCatalog = paimonCatalog;
         this.paimonDatabaseName = paimonDatabaseName;
         this.paimonTableName = paimonTableName;
+        this.coreOptions = new CoreOptions(options);
+        checkArgument(
+                coreOptions.bucket() == -1,
+                "Iceberg migrator only support unaware-bucket target table, 
bucket should be -1");
+        checkArgument(
+                !options.containsKey(CoreOptions.PRIMARY_KEY.key()),
+                "Iceberg migrator does not support define primary key for 
target table.");
 
         this.icebergDatabaseName = icebergDatabaseName;
         this.icebergTableName = icebergTableName;
@@ -136,15 +154,28 @@ public class IcebergMigrator implements Migrator {
 
     @Override
     public void executeMigrate() throws Exception {
-        Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
+        List<TableSchema> paimonSchemas = 
icebergSchemasToPaimonSchemas(icebergMetadata);
+        Preconditions.checkArgument(
+                !paimonSchemas.isEmpty(),
+                "paimon schemas transformed from iceberg table is empty.");
         Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, 
paimonTableName);
 
         paimonCatalog.createDatabase(paimonDatabaseName, true);
-        paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
+        TableSchema firstSchema = paimonSchemas.get(0);
+        Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first 
schema id is not 0.");
+        paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), 
false);
 
         try {
             FileStoreTable paimonTable = (FileStoreTable) 
paimonCatalog.getTable(paimonIdentifier);
             FileIO fileIO = paimonTable.fileIO();
+            SchemaManager schemaManager = paimonTable.schemaManager();
+            // commit all the iceberg schemas
+            for (int i = 1; i < paimonSchemas.size(); i++) {
+                LOG.info(
+                        "commit new schema from iceberg, new schema id:{}",
+                        paimonSchemas.get(i).id());
+                schemaManager.commit(paimonSchemas.get(i));
+            }
 
             IcebergManifestFile manifestFile =
                     IcebergManifestFile.create(paimonTable, 
icebergMetaPathFactory);
@@ -157,25 +188,36 @@ public class IcebergMigrator implements Migrator {
             // check manifest file with 'DELETE' kind
             checkAndFilterManifestFiles(icebergManifestFileMetas);
 
-            // get all live iceberg entries
-            List<IcebergManifestEntry> icebergEntries =
-                    icebergManifestFileMetas.stream()
-                            .flatMap(fileMeta -> 
manifestFile.read(fileMeta).stream())
-                            .filter(IcebergManifestEntry::isLive)
-                            .collect(Collectors.toList());
-            if (icebergEntries.isEmpty()) {
+            Map<Long, List<IcebergManifestEntry>> icebergEntries = new 
HashMap<>();
+            for (IcebergManifestFileMeta icebergManifestFileMeta : 
icebergManifestFileMetas) {
+                long schemaId =
+                        getSchemaIdFromIcebergManifestFile(
+                                new 
Path(icebergManifestFileMeta.manifestPath()), fileIO);
+                List<IcebergManifestEntry> entries = 
manifestFile.read(icebergManifestFileMeta);
+                icebergEntries
+                        .computeIfAbsent(schemaId, v -> new ArrayList<>())
+                        .addAll(
+                                entries.stream()
+                                        .filter(IcebergManifestEntry::isLive)
+                                        .collect(Collectors.toList()));
+            }
+
+            List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
+            // write schema id to IcebergDataFileMeta
+            for (Map.Entry<Long, List<IcebergManifestEntry>> kv : 
icebergEntries.entrySet()) {
+                icebergDataFileMetas.addAll(
+                        kv.getValue().stream()
+                                .map(entry -> 
entry.file().withSchemaId(kv.getKey()))
+                                .collect(Collectors.toList()));
+            }
+
+            if (icebergDataFileMetas.isEmpty()) {
                 LOG.info(
-                        "No live manifest entry in iceberg table for snapshot 
{}, iceberg table meta path is {}.",
+                        "No live iceberg data files in iceberg table for 
snapshot {}, iceberg table meta path is {}.",
                         icebergMetadata.currentSnapshotId(),
                         icebergLatestMetadataLocation);
                 return;
             }
-
-            List<IcebergDataFileMeta> icebergDataFileMetas =
-                    icebergEntries.stream()
-                            .map(IcebergManifestEntry::file)
-                            .collect(Collectors.toList());
-
             // Again, check if delete File exists
             checkAndFilterDataFiles(icebergDataFileMetas);
 
@@ -246,10 +288,21 @@ public class IcebergMigrator implements Migrator {
         paimonCatalog.renameTable(targetTableId, sourceTableId, 
ignoreIfNotExists);
     }
 
-    public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) 
{
-        // get iceberg current schema
-        IcebergSchema icebergSchema =
-                
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+    private List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata 
icebergMetadata) {
+        return icebergMetadata.schemas().stream()
+                .map(
+                        icebergSchema -> {
+                            LOG.info(
+                                    "Convert iceberg schema to paimon schema, 
iceberg schema id: {}",
+                                    icebergSchema.schemaId());
+                            return TableSchema.create(
+                                    icebergSchema.schemaId(),
+                                    
icebergSchemaToPaimonSchema(icebergSchema));
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) {
 
         // get iceberg current partition spec
         int currentPartitionSpecId = icebergMetadata.defaultSpecId();
@@ -289,6 +342,18 @@ public class IcebergMigrator implements Migrator {
         }
     }
 
+    private long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO 
fileIO) {
+
+        try (DataFileStream<GenericRecord> dataFileStream =
+                new DataFileStream<>(
+                        fileIO.newInputStream(manifestPath), new 
GenericDatumReader<>())) {
+            String schema = dataFileStream.getMetaString("schema");
+            return JsonSerdeUtil.fromJson(schema, 
IcebergSchema.class).schemaId();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private static List<DataFileMeta> construct(
             List<IcebergDataFileMeta> icebergDataFileMetas,
             FileIO fileIO,
@@ -318,7 +383,9 @@ public class IcebergMigrator implements Migrator {
                     e);
         }
         String format = icebergDataFileMeta.fileFormat();
-        return FileMetaUtils.constructFileMeta(format, status, fileIO, table, 
dir, rollback);
+        long schemaId = icebergDataFileMeta.schemaId();
+        return FileMetaUtils.constructFileMeta(
+                format, status, fileIO, table, dir, rollback, schemaId);
     }
 
     private MigrateTask importUnPartitionedTable(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 366f8afcfd..405870d5fa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -129,6 +129,47 @@ public class FileMetaUtils {
         }
     }
 
+    public static DataFileMeta constructFileMeta(
+            String format,
+            FileStatus fileStatus,
+            FileIO fileIO,
+            Table table,
+            Path dir,
+            Map<Path, Path> rollback,
+            long schemaId) {
+
+        try {
+            RowType rowTypeWithSchemaId =
+                    ((FileStoreTable) 
table).schemaManager().schema(schemaId).logicalRowType();
+            SimpleColStatsCollector.Factory[] factories =
+                    StatsCollectorFactories.createStatsFactories(
+                            ((FileStoreTable) table).coreOptions(),
+                            rowTypeWithSchemaId.getFieldNames());
+
+            SimpleStatsExtractor simpleStatsExtractor =
+                    FileFormat.fromIdentifier(
+                                    format,
+                                    ((FileStoreTable) 
table).coreOptions().toConfiguration())
+                            .createStatsExtractor(rowTypeWithSchemaId, 
factories)
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Can't get table stats 
extractor for format "
+                                                            + format));
+            Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, 
format, rollback);
+            return constructFileMeta(
+                    newPath.getName(),
+                    fileStatus.getLen(),
+                    newPath,
+                    simpleStatsExtractor,
+                    fileIO,
+                    table,
+                    schemaId);
+        } catch (IOException e) {
+            throw new RuntimeException("error when construct file meta", e);
+        }
+    }
+
     // -----------------------------private 
method---------------------------------------------
 
     private static Path renameFile(
@@ -152,7 +193,29 @@ public class FileMetaUtils {
             FileIO fileIO,
             Table table)
             throws IOException {
-        SimpleStatsConverter statsArraySerializer = new 
SimpleStatsConverter(table.rowType());
+        return constructFileMeta(
+                fileName,
+                fileSize,
+                path,
+                simpleStatsExtractor,
+                fileIO,
+                table,
+                ((FileStoreTable) table).schema().id());
+    }
+
+    private static DataFileMeta constructFileMeta(
+            String fileName,
+            long fileSize,
+            Path path,
+            SimpleStatsExtractor simpleStatsExtractor,
+            FileIO fileIO,
+            Table table,
+            long schemaId)
+            throws IOException {
+        RowType rowTypeWithSchemaId =
+                ((FileStoreTable) 
table).schemaManager().schema(schemaId).logicalRowType();
+
+        SimpleStatsConverter statsArraySerializer = new 
SimpleStatsConverter(rowTypeWithSchemaId);
 
         Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
                 simpleStatsExtractor.extractWithFileInfo(fileIO, path);
@@ -165,7 +228,7 @@ public class FileMetaUtils {
                 stats,
                 0,
                 0,
-                ((FileStoreTable) table).schema().id(),
+                schemaId,
                 Collections.emptyList(),
                 null,
                 FileSource.APPEND,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 6213503a47..58c5d2b984 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -771,7 +771,7 @@ public class SchemaManager implements Serializable {
     }
 
     @VisibleForTesting
-    boolean commit(TableSchema newSchema) throws Exception {
+    public boolean commit(TableSchema newSchema) throws Exception {
         SchemaValidation.validateTableSchema(newSchema);
         SchemaValidation.validateFallbackBranch(this, newSchema);
         Path schemaPath = toSchemaPath(newSchema.id());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
index aadaca0c38..40b0611481 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
@@ -64,8 +64,10 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -118,27 +120,7 @@ public class IcebergMigrateTest {
     public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception {
         Table icebergTable = createIcebergTable(isPartitioned);
         String format = "parquet";
-        List<GenericRecord> records1 =
-                Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "00"),
-                                toIcebergRecord(2, 2, "20240101", "00"))
-                        .collect(Collectors.toList());
-        if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
-        } else {
-            writeRecordsToIceberg(icebergTable, format, records1);
-        }
-
-        List<GenericRecord> records2 =
-                Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "01"),
-                                toIcebergRecord(2, 2, "20240101", "01"))
-                        .collect(Collectors.toList());
-        if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
-        } else {
-            writeRecordsToIceberg(icebergTable, format, records2);
-        }
+        writeInitialData(icebergTable, format, isPartitioned);
 
         IcebergMigrator icebergMigrator =
                 new IcebergMigrator(
@@ -148,7 +130,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
         icebergMigrator.executeMigrate();
         icebergMigrator.renameTable(false);
 
@@ -160,8 +143,11 @@ public class IcebergMigrateTest {
                                 .map(row -> String.format("Record(%s)", row))
                                 .collect(Collectors.toList()))
                 .hasSameElementsAs(
-                        Stream.concat(records1.stream(), records2.stream())
-                                .map(GenericRecord::toString)
+                        Stream.of(
+                                        "Record(1, 1, 20240101, 00)",
+                                        "Record(2, 2, 20240101, 00)",
+                                        "Record(1, 1, 20240101, 01)",
+                                        "Record(2, 2, 20240101, 01)")
                                 .collect(Collectors.toList()));
 
         // verify iceberg table has been deleted
@@ -173,27 +159,7 @@ public class IcebergMigrateTest {
     public void testMigrateAddAndDelete(boolean isPartitioned) throws 
Exception {
         Table icebergTable = createIcebergTable(isPartitioned);
         String format = "parquet";
-        List<GenericRecord> records1 =
-                Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "00"),
-                                toIcebergRecord(2, 2, "20240101", "00"))
-                        .collect(Collectors.toList());
-        if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
-        } else {
-            writeRecordsToIceberg(icebergTable, format, records1);
-        }
-
-        List<GenericRecord> records2 =
-                Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "01"),
-                                toIcebergRecord(2, 2, "20240101", "01"))
-                        .collect(Collectors.toList());
-        if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
-        } else {
-            writeRecordsToIceberg(icebergTable, format, records2);
-        }
+        writeInitialData(icebergTable, format, isPartitioned);
 
         // the file written with records2 will be deleted and generate a 
delete manifest entry, not
         // a delete file
@@ -207,7 +173,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
         icebergMigrator.executeMigrate();
 
         FileStoreTable paimonTable =
@@ -218,8 +185,7 @@ public class IcebergMigrateTest {
                                 .map(row -> String.format("Record(%s)", row))
                                 .collect(Collectors.toList()))
                 .hasSameElementsAs(
-                        records2.stream()
-                                .map(GenericRecord::toString)
+                        Stream.of("Record(1, 1, 20240101, 01)", "Record(2, 2, 
20240101, 01)")
                                 .collect(Collectors.toList()));
     }
 
@@ -264,7 +230,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
 
         assertThatThrownBy(icebergMigrator::executeMigrate)
                 .rootCause()
@@ -306,7 +273,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
         icebergMigrator.executeMigrate();
 
         FileStoreTable paimonTable =
@@ -324,45 +292,117 @@ public class IcebergMigrateTest {
 
     @ParameterizedTest(name = "isPartitioned = {0}")
     @ValueSource(booleans = {true, false})
-    public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws 
Exception {
+    public void testDeleteColumn(boolean isPartitioned) throws Exception {
         Table icebergTable = createIcebergTable(isPartitioned);
         String format = "parquet";
+        writeInitialData(icebergTable, format, isPartitioned);
 
-        // write base data
-        List<GenericRecord> records1 =
+        icebergTable.updateSchema().deleteColumn("v").commit();
+        Schema newIceSchema = icebergTable.schema();
+        List<GenericRecord> addedRecords =
                 Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "00"),
-                                toIcebergRecord(2, 2, "20240101", "00"))
+                                toIcebergRecord(3, "20240101", "00", 
newIceSchema),
+                                toIcebergRecord(4, "20240101", "00", 
newIceSchema))
                         .collect(Collectors.toList());
         if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+            writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
         } else {
-            writeRecordsToIceberg(icebergTable, format, records1);
+            writeRecordsToIceberg(icebergTable, format, addedRecords);
         }
 
-        List<GenericRecord> records2 =
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1,
+                        Collections.emptyMap());
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.of(
+                                        "Record(1, 20240101, 00)",
+                                        "Record(2, 20240101, 00)",
+                                        "Record(1, 20240101, 01)",
+                                        "Record(2, 20240101, 01)",
+                                        "Record(3, 20240101, 00)",
+                                        "Record(4, 20240101, 00)")
+                                .collect(Collectors.toList()));
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testRenameColumn(boolean isPartitioned) throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        writeInitialData(icebergTable, format, isPartitioned);
+
+        icebergTable.updateSchema().renameColumn("v", "v2").commit();
+        Schema newIceSchema = icebergTable.schema();
+        List<GenericRecord> addedRecords =
                 Stream.of(
-                                toIcebergRecord(1, 1, "20240101", "01"),
-                                toIcebergRecord(2, 2, "20240101", "01"))
+                                toIcebergRecord(newIceSchema, 3, 3, 
"20240101", "00"),
+                                toIcebergRecord(newIceSchema, 4, 4, 
"20240101", "00"))
                         .collect(Collectors.toList());
         if (isPartitioned) {
-            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+            writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
         } else {
-            writeRecordsToIceberg(icebergTable, format, records2);
+            writeRecordsToIceberg(icebergTable, format, addedRecords);
         }
 
-        // TODO: currently only support schema evolution of deleting columns
-        testDeleteColumn(icebergTable, format, isPartitioned);
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1,
+                        Collections.emptyMap());
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.of(
+                                        "Record(1, 1, 20240101, 00)",
+                                        "Record(2, 2, 20240101, 00)",
+                                        "Record(1, 1, 20240101, 01)",
+                                        "Record(2, 2, 20240101, 01)",
+                                        "Record(3, 3, 20240101, 00)",
+                                        "Record(4, 4, 20240101, 00)")
+                                .collect(Collectors.toList()));
     }
 
-    private void testDeleteColumn(Table icebergTable, String format, boolean 
isPartitioned)
-            throws Exception {
-        icebergTable.updateSchema().deleteColumn("v").commit();
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testAddColumn(boolean isPartitioned) throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        writeInitialData(icebergTable, format, isPartitioned);
+
+        icebergTable.updateSchema().addColumn("v2", 
Types.IntegerType.get()).commit();
         Schema newIceSchema = icebergTable.schema();
         List<GenericRecord> addedRecords =
                 Stream.of(
-                                toIcebergRecord(3, "20240101", "00", 
newIceSchema),
-                                toIcebergRecord(4, "20240101", "00", 
newIceSchema))
+                                toIcebergRecord(newIceSchema, 3, 3, 
"20240101", "00", 3),
+                                toIcebergRecord(newIceSchema, 4, 4, 
"20240101", "00", 4))
                         .collect(Collectors.toList());
         if (isPartitioned) {
             writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
@@ -378,7 +418,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
         icebergMigrator.executeMigrate();
 
         FileStoreTable paimonTable =
@@ -390,15 +431,236 @@ public class IcebergMigrateTest {
                                 .collect(Collectors.toList()))
                 .hasSameElementsAs(
                         Stream.of(
-                                        "Record(1, 20240101, 00)",
-                                        "Record(2, 20240101, 00)",
-                                        "Record(1, 20240101, 01)",
-                                        "Record(2, 20240101, 01)",
-                                        "Record(3, 20240101, 00)",
-                                        "Record(4, 20240101, 00)")
+                                        "Record(1, 1, 20240101, 00, NULL)",
+                                        "Record(2, 2, 20240101, 00, NULL)",
+                                        "Record(1, 1, 20240101, 01, NULL)",
+                                        "Record(2, 2, 20240101, 01, NULL)",
+                                        "Record(3, 3, 20240101, 00, 3)",
+                                        "Record(4, 4, 20240101, 00, 4)")
+                                .collect(Collectors.toList()));
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testReorderColumn(boolean isPartitioned) throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        writeInitialData(icebergTable, format, isPartitioned);
+
+        icebergTable.updateSchema().moveAfter("v", "hh").commit();
+        Schema newIceSchema = icebergTable.schema();
+        List<GenericRecord> addedRecords =
+                Stream.of(
+                                toIcebergRecord(newIceSchema, 3, "20240101", 
"00", 3),
+                                toIcebergRecord(newIceSchema, 4, "20240101", 
"00", 4))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, addedRecords);
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1,
+                        Collections.emptyMap());
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.of(
+                                        "Record(1, 20240101, 00, 1)",
+                                        "Record(2, 20240101, 00, 2)",
+                                        "Record(1, 20240101, 01, 1)",
+                                        "Record(2, 20240101, 01, 2)",
+                                        "Record(3, 20240101, 00, 3)",
+                                        "Record(4, 20240101, 00, 4)")
+                                .collect(Collectors.toList()));
+    }
+
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testUpdateColumn(boolean isPartitioned) throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        String format = "parquet";
+        writeInitialData(icebergTable, format, isPartitioned);
+
+        icebergTable.updateSchema().updateColumn("v", 
Types.LongType.get()).commit();
+        Schema newIceSchema = icebergTable.schema();
+        List<GenericRecord> addedRecords =
+                Stream.of(
+                                toIcebergRecord(newIceSchema, 3, 3L, 
"20240101", "00"),
+                                toIcebergRecord(newIceSchema, 4, 3L, 
"20240101", "00"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, addedRecords, 
"20240101", "00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, addedRecords);
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1,
+                        Collections.emptyMap());
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("Record(%s)", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        Stream.of(
+                                        "Record(1, 1, 20240101, 00)",
+                                        "Record(2, 2, 20240101, 00)",
+                                        "Record(1, 1, 20240101, 01)",
+                                        "Record(2, 2, 20240101, 01)",
+                                        "Record(3, 3, 20240101, 00)",
+                                        "Record(4, 3, 20240101, 00)")
                                 .collect(Collectors.toList()));
     }
 
+    @ParameterizedTest(name = "isPartitioned = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned) 
throws Exception {
+        Table icebergTable = createIcebergTable(isPartitioned);
+        icebergTable.updateSchema().addColumn("v2", 
Types.IntegerType.get()).commit();
+        String format = "parquet";
+        List<String> index = new LinkedList<>(Arrays.asList("k", "v", "dt", 
"hh", "v2"));
+
+        int numRounds = 20;
+        int numRecords = 10;
+        List<Integer> ops = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean isTypeChange = false;
+        List<List<String>> expectRecords = new ArrayList<>();
+
+        for (int i = 0; i < numRounds; i++) {
+            List<GenericRecord> records = new ArrayList<>();
+            String dt = Integer.toString(random.nextInt(20240101, 20240104));
+            String hh = Integer.toString(random.nextInt(3));
+
+            if ((i + 1) % 4 == 0 && !ops.isEmpty()) {
+                switch (ops.remove(random.nextInt(ops.size()))) {
+                    case 1:
+                        icebergTable
+                                .updateSchema()
+                                .addColumn("v3", Types.IntegerType.get())
+                                .commit();
+                        for (List<String> record : expectRecords) {
+                            record.add("NULL");
+                        }
+                        index.add("v3");
+                        break;
+                    case 2:
+                        icebergTable.updateSchema().renameColumn("v", 
"vv").commit();
+                        break;
+                    case 3:
+                        
icebergTable.updateSchema().deleteColumn("v2").commit();
+                        int v2Idx = index.indexOf("v2");
+                        for (List<String> record : expectRecords) {
+                            record.remove(v2Idx);
+                        }
+                        index.remove(v2Idx);
+                        break;
+                    case 4:
+                        icebergTable.updateSchema().moveAfter("k", 
"hh").commit();
+                        int kIdx = index.indexOf("k");
+                        int hhIdx = index.indexOf("hh");
+                        for (List<String> record : expectRecords) {
+                            String k = record.remove(kIdx);
+                            record.add(hhIdx, k);
+                        }
+                        index.remove(kIdx);
+                        index.add(hhIdx, "k");
+                        break;
+                    case 5:
+                        icebergTable
+                                .updateSchema()
+                                .updateColumn("k", Types.LongType.get())
+                                .commit();
+                        isTypeChange = true;
+                        break;
+                    default:
+                        throw new IllegalStateException("Unknown operation");
+                }
+            }
+            for (int j = 0; j < numRecords; j++) {
+                List<String> recordString = new ArrayList<>();
+                GenericRecord record = 
GenericRecord.create(icebergTable.schema());
+                for (int idx = 0; idx < index.size(); idx++) {
+                    String field = index.get(idx);
+                    if (field.equals("dt")) {
+                        record.set(idx, dt);
+                        recordString.add(dt);
+                    } else if (field.equals("hh")) {
+                        record.set(idx, hh);
+                        recordString.add(hh);
+                    } else {
+                        int value = random.nextInt(100);
+                        if (field.equals("k") && isTypeChange) {
+                            record.set(idx, (long) value);
+                        } else {
+                            record.set(idx, value);
+                        }
+                        recordString.add(String.valueOf(value));
+                    }
+                }
+                records.add(record);
+                expectRecords.add(recordString);
+            }
+
+            if (isPartitioned) {
+                writeRecordsToIceberg(icebergTable, format, records, dt, hh);
+            } else {
+                writeRecordsToIceberg(icebergTable, format, records);
+            }
+        }
+
+        IcebergMigrator icebergMigrator =
+                new IcebergMigrator(
+                        paiCatalog,
+                        paiDatabase,
+                        paiTable,
+                        iceDatabase,
+                        iceTable,
+                        new Options(icebergProperties),
+                        1,
+                        Collections.emptyMap());
+        icebergMigrator.executeMigrate();
+
+        FileStoreTable paimonTable =
+                (FileStoreTable) 
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+        List<String> paiResults = getPaimonResult(paimonTable);
+        System.out.println();
+        assertThat(
+                        paiResults.stream()
+                                .map(row -> String.format("[%s]", row))
+                                .collect(Collectors.toList()))
+                .hasSameElementsAs(
+                        
expectRecords.stream().map(List::toString).collect(Collectors.toList()));
+    }
+
     @Test
     public void testAllDataTypes() throws Exception {
         Schema iceAllTypesSchema =
@@ -444,7 +706,8 @@ public class IcebergMigrateTest {
                         iceDatabase,
                         iceTable,
                         new Options(icebergProperties),
-                        1);
+                        1,
+                        Collections.emptyMap());
         icebergMigrator.executeMigrate();
 
         FileStoreTable paimonTable =
@@ -490,6 +753,31 @@ public class IcebergMigrateTest {
         }
     }
 
+    private void writeInitialData(Table icebergTable, String format, boolean 
isPartitioned)
+            throws IOException {
+        List<GenericRecord> records1 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "00"),
+                                toIcebergRecord(2, 2, "20240101", "00"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records1, "20240101", 
"00");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records1);
+        }
+
+        List<GenericRecord> records2 =
+                Stream.of(
+                                toIcebergRecord(1, 1, "20240101", "01"),
+                                toIcebergRecord(2, 2, "20240101", "01"))
+                        .collect(Collectors.toList());
+        if (isPartitioned) {
+            writeRecordsToIceberg(icebergTable, format, records2, "20240101", 
"01");
+        } else {
+            writeRecordsToIceberg(icebergTable, format, records2);
+        }
+    }
+
     private GenericRecord toIcebergRecord(Schema icebergSchema, Object... 
values) {
         GenericRecord record = GenericRecord.create(icebergSchema);
         for (int i = 0; i < values.length; i++) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 4e7268c6f1..5c0338f893 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -80,7 +80,8 @@ public class TableMigrationUtils {
                 sourceDatabase,
                 sourceTableName,
                 icebergConf,
-                parallelism);
+                parallelism,
+                options);
     }
 
     public static List<Migrator> getImporters(


Reply via email to