This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7210cf618f [iceberg] support migrating iceberg table which had
suffered schema evolution (#5083)
7210cf618f is described below
commit 7210cf618f17036f24c111293fcdd69ba350afb1
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Feb 27 10:58:52 2025 +0800
[iceberg] support migrating iceberg table which had suffered schema
evolution (#5083)
This closes #5083.
---
.../iceberg/manifest/IcebergDataFileMeta.java | 12 +
.../paimon/iceberg/migrate/IcebergMigrator.java | 111 +++++-
.../org/apache/paimon/migrate/FileMetaUtils.java | 67 +++-
.../org/apache/paimon/schema/SchemaManager.java | 2 +-
.../paimon/iceberg/migrate/IcebergMigrateTest.java | 442 +++++++++++++++++----
.../paimon/flink/utils/TableMigrationUtils.java | 3 +-
6 files changed, 534 insertions(+), 103 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
index d171962bec..cb78c3c646 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -83,6 +83,9 @@ public class IcebergDataFileMeta {
private final InternalMap lowerBounds;
private final InternalMap upperBounds;
+ // only used for iceberg migrate
+ private long schemaId = 0;
+
IcebergDataFileMeta(
Content content,
String filePath,
@@ -201,6 +204,15 @@ public class IcebergDataFileMeta {
return upperBounds;
}
+ public long schemaId() {
+ return schemaId;
+ }
+
+ public IcebergDataFileMeta withSchemaId(long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
public static RowType schema(RowType partitionType) {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 9e91fa2d18..4aff0ded5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -18,6 +18,7 @@
package org.apache.paimon.iceberg.migrate;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
@@ -43,19 +44,26 @@ import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -64,6 +72,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** migrate iceberg table to paimon table. */
@@ -75,6 +84,7 @@ public class IcebergMigrator implements Migrator {
private final Catalog paimonCatalog;
private final String paimonDatabaseName;
private final String paimonTableName;
+ private final CoreOptions coreOptions;
private final String icebergDatabaseName;
private final String icebergTableName;
@@ -97,10 +107,18 @@ public class IcebergMigrator implements Migrator {
String icebergDatabaseName,
String icebergTableName,
Options icebergOptions,
- Integer parallelism) {
+ Integer parallelism,
+ Map<String, String> options) {
this.paimonCatalog = paimonCatalog;
this.paimonDatabaseName = paimonDatabaseName;
this.paimonTableName = paimonTableName;
+ this.coreOptions = new CoreOptions(options);
+ checkArgument(
+ coreOptions.bucket() == -1,
+ "Iceberg migrator only support unaware-bucket target table,
bucket should be -1");
+ checkArgument(
+ !options.containsKey(CoreOptions.PRIMARY_KEY.key()),
+ "Iceberg migrator does not support define primary key for
target table.");
this.icebergDatabaseName = icebergDatabaseName;
this.icebergTableName = icebergTableName;
@@ -136,15 +154,28 @@ public class IcebergMigrator implements Migrator {
@Override
public void executeMigrate() throws Exception {
- Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
+ List<TableSchema> paimonSchemas =
icebergSchemasToPaimonSchemas(icebergMetadata);
+ Preconditions.checkArgument(
+ !paimonSchemas.isEmpty(),
+ "paimon schemas transformed from iceberg table is empty.");
Identifier paimonIdentifier = Identifier.create(paimonDatabaseName,
paimonTableName);
paimonCatalog.createDatabase(paimonDatabaseName, true);
- paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
+ TableSchema firstSchema = paimonSchemas.get(0);
+ Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first
schema id is not 0.");
+ paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(),
false);
try {
FileStoreTable paimonTable = (FileStoreTable)
paimonCatalog.getTable(paimonIdentifier);
FileIO fileIO = paimonTable.fileIO();
+ SchemaManager schemaManager = paimonTable.schemaManager();
+ // commit all the iceberg schemas
+ for (int i = 1; i < paimonSchemas.size(); i++) {
+ LOG.info(
+ "commit new schema from iceberg, new schema id:{}",
+ paimonSchemas.get(i).id());
+ schemaManager.commit(paimonSchemas.get(i));
+ }
IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable,
icebergMetaPathFactory);
@@ -157,25 +188,36 @@ public class IcebergMigrator implements Migrator {
// check manifest file with 'DELETE' kind
checkAndFilterManifestFiles(icebergManifestFileMetas);
- // get all live iceberg entries
- List<IcebergManifestEntry> icebergEntries =
- icebergManifestFileMetas.stream()
- .flatMap(fileMeta ->
manifestFile.read(fileMeta).stream())
- .filter(IcebergManifestEntry::isLive)
- .collect(Collectors.toList());
- if (icebergEntries.isEmpty()) {
+ Map<Long, List<IcebergManifestEntry>> icebergEntries = new
HashMap<>();
+ for (IcebergManifestFileMeta icebergManifestFileMeta :
icebergManifestFileMetas) {
+ long schemaId =
+ getSchemaIdFromIcebergManifestFile(
+ new
Path(icebergManifestFileMeta.manifestPath()), fileIO);
+ List<IcebergManifestEntry> entries =
manifestFile.read(icebergManifestFileMeta);
+ icebergEntries
+ .computeIfAbsent(schemaId, v -> new ArrayList<>())
+ .addAll(
+ entries.stream()
+ .filter(IcebergManifestEntry::isLive)
+ .collect(Collectors.toList()));
+ }
+
+ List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
+ // write schema id to IcebergDataFileMeta
+ for (Map.Entry<Long, List<IcebergManifestEntry>> kv :
icebergEntries.entrySet()) {
+ icebergDataFileMetas.addAll(
+ kv.getValue().stream()
+ .map(entry ->
entry.file().withSchemaId(kv.getKey()))
+ .collect(Collectors.toList()));
+ }
+
+ if (icebergDataFileMetas.isEmpty()) {
LOG.info(
- "No live manifest entry in iceberg table for snapshot
{}, iceberg table meta path is {}.",
+ "No live iceberg data files in iceberg table for
snapshot {}, iceberg table meta path is {}.",
icebergMetadata.currentSnapshotId(),
icebergLatestMetadataLocation);
return;
}
-
- List<IcebergDataFileMeta> icebergDataFileMetas =
- icebergEntries.stream()
- .map(IcebergManifestEntry::file)
- .collect(Collectors.toList());
-
// Again, check if delete File exists
checkAndFilterDataFiles(icebergDataFileMetas);
@@ -246,10 +288,21 @@ public class IcebergMigrator implements Migrator {
paimonCatalog.renameTable(targetTableId, sourceTableId,
ignoreIfNotExists);
}
- public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata)
{
- // get iceberg current schema
- IcebergSchema icebergSchema =
-
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+ private List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata
icebergMetadata) {
+ return icebergMetadata.schemas().stream()
+ .map(
+ icebergSchema -> {
+ LOG.info(
+ "Convert iceberg schema to paimon schema,
iceberg schema id: {}",
+ icebergSchema.schemaId());
+ return TableSchema.create(
+ icebergSchema.schemaId(),
+
icebergSchemaToPaimonSchema(icebergSchema));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) {
// get iceberg current partition spec
int currentPartitionSpecId = icebergMetadata.defaultSpecId();
@@ -289,6 +342,18 @@ public class IcebergMigrator implements Migrator {
}
}
+ private long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO
fileIO) {
+
+ try (DataFileStream<GenericRecord> dataFileStream =
+ new DataFileStream<>(
+ fileIO.newInputStream(manifestPath), new
GenericDatumReader<>())) {
+ String schema = dataFileStream.getMetaString("schema");
+ return JsonSerdeUtil.fromJson(schema,
IcebergSchema.class).schemaId();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static List<DataFileMeta> construct(
List<IcebergDataFileMeta> icebergDataFileMetas,
FileIO fileIO,
@@ -318,7 +383,9 @@ public class IcebergMigrator implements Migrator {
e);
}
String format = icebergDataFileMeta.fileFormat();
- return FileMetaUtils.constructFileMeta(format, status, fileIO, table,
dir, rollback);
+ long schemaId = icebergDataFileMeta.schemaId();
+ return FileMetaUtils.constructFileMeta(
+ format, status, fileIO, table, dir, rollback, schemaId);
}
private MigrateTask importUnPartitionedTable(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 366f8afcfd..405870d5fa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -129,6 +129,47 @@ public class FileMetaUtils {
}
}
+ public static DataFileMeta constructFileMeta(
+ String format,
+ FileStatus fileStatus,
+ FileIO fileIO,
+ Table table,
+ Path dir,
+ Map<Path, Path> rollback,
+ long schemaId) {
+
+ try {
+ RowType rowTypeWithSchemaId =
+ ((FileStoreTable)
table).schemaManager().schema(schemaId).logicalRowType();
+ SimpleColStatsCollector.Factory[] factories =
+ StatsCollectorFactories.createStatsFactories(
+ ((FileStoreTable) table).coreOptions(),
+ rowTypeWithSchemaId.getFieldNames());
+
+ SimpleStatsExtractor simpleStatsExtractor =
+ FileFormat.fromIdentifier(
+ format,
+ ((FileStoreTable)
table).coreOptions().toConfiguration())
+ .createStatsExtractor(rowTypeWithSchemaId,
factories)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Can't get table stats
extractor for format "
+ + format));
+ Path newPath = renameFile(fileIO, fileStatus.getPath(), dir,
format, rollback);
+ return constructFileMeta(
+ newPath.getName(),
+ fileStatus.getLen(),
+ newPath,
+ simpleStatsExtractor,
+ fileIO,
+ table,
+ schemaId);
+ } catch (IOException e) {
+ throw new RuntimeException("error when construct file meta", e);
+ }
+ }
+
// -----------------------------private
method---------------------------------------------
private static Path renameFile(
@@ -152,7 +193,29 @@ public class FileMetaUtils {
FileIO fileIO,
Table table)
throws IOException {
- SimpleStatsConverter statsArraySerializer = new
SimpleStatsConverter(table.rowType());
+ return constructFileMeta(
+ fileName,
+ fileSize,
+ path,
+ simpleStatsExtractor,
+ fileIO,
+ table,
+ ((FileStoreTable) table).schema().id());
+ }
+
+ private static DataFileMeta constructFileMeta(
+ String fileName,
+ long fileSize,
+ Path path,
+ SimpleStatsExtractor simpleStatsExtractor,
+ FileIO fileIO,
+ Table table,
+ long schemaId)
+ throws IOException {
+ RowType rowTypeWithSchemaId =
+ ((FileStoreTable)
table).schemaManager().schema(schemaId).logicalRowType();
+
+ SimpleStatsConverter statsArraySerializer = new
SimpleStatsConverter(rowTypeWithSchemaId);
Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
simpleStatsExtractor.extractWithFileInfo(fileIO, path);
@@ -165,7 +228,7 @@ public class FileMetaUtils {
stats,
0,
0,
- ((FileStoreTable) table).schema().id(),
+ schemaId,
Collections.emptyList(),
null,
FileSource.APPEND,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 6213503a47..58c5d2b984 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -771,7 +771,7 @@ public class SchemaManager implements Serializable {
}
@VisibleForTesting
- boolean commit(TableSchema newSchema) throws Exception {
+ public boolean commit(TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
SchemaValidation.validateFallbackBranch(this, newSchema);
Path schemaPath = toSchemaPath(newSchema.id());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
index aadaca0c38..40b0611481 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
@@ -64,8 +64,10 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -118,27 +120,7 @@ public class IcebergMigrateTest {
public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception {
Table icebergTable = createIcebergTable(isPartitioned);
String format = "parquet";
- List<GenericRecord> records1 =
- Stream.of(
- toIcebergRecord(1, 1, "20240101", "00"),
- toIcebergRecord(2, 2, "20240101", "00"))
- .collect(Collectors.toList());
- if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records1, "20240101",
"00");
- } else {
- writeRecordsToIceberg(icebergTable, format, records1);
- }
-
- List<GenericRecord> records2 =
- Stream.of(
- toIcebergRecord(1, 1, "20240101", "01"),
- toIcebergRecord(2, 2, "20240101", "01"))
- .collect(Collectors.toList());
- if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records2, "20240101",
"01");
- } else {
- writeRecordsToIceberg(icebergTable, format, records2);
- }
+ writeInitialData(icebergTable, format, isPartitioned);
IcebergMigrator icebergMigrator =
new IcebergMigrator(
@@ -148,7 +130,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
icebergMigrator.executeMigrate();
icebergMigrator.renameTable(false);
@@ -160,8 +143,11 @@ public class IcebergMigrateTest {
.map(row -> String.format("Record(%s)", row))
.collect(Collectors.toList()))
.hasSameElementsAs(
- Stream.concat(records1.stream(), records2.stream())
- .map(GenericRecord::toString)
+ Stream.of(
+ "Record(1, 1, 20240101, 00)",
+ "Record(2, 2, 20240101, 00)",
+ "Record(1, 1, 20240101, 01)",
+ "Record(2, 2, 20240101, 01)")
.collect(Collectors.toList()));
// verify iceberg table has been deleted
@@ -173,27 +159,7 @@ public class IcebergMigrateTest {
public void testMigrateAddAndDelete(boolean isPartitioned) throws
Exception {
Table icebergTable = createIcebergTable(isPartitioned);
String format = "parquet";
- List<GenericRecord> records1 =
- Stream.of(
- toIcebergRecord(1, 1, "20240101", "00"),
- toIcebergRecord(2, 2, "20240101", "00"))
- .collect(Collectors.toList());
- if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records1, "20240101",
"00");
- } else {
- writeRecordsToIceberg(icebergTable, format, records1);
- }
-
- List<GenericRecord> records2 =
- Stream.of(
- toIcebergRecord(1, 1, "20240101", "01"),
- toIcebergRecord(2, 2, "20240101", "01"))
- .collect(Collectors.toList());
- if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records2, "20240101",
"01");
- } else {
- writeRecordsToIceberg(icebergTable, format, records2);
- }
+ writeInitialData(icebergTable, format, isPartitioned);
// the file written with records2 will be deleted and generate a
delete manifest entry, not
// a delete file
@@ -207,7 +173,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
icebergMigrator.executeMigrate();
FileStoreTable paimonTable =
@@ -218,8 +185,7 @@ public class IcebergMigrateTest {
.map(row -> String.format("Record(%s)", row))
.collect(Collectors.toList()))
.hasSameElementsAs(
- records2.stream()
- .map(GenericRecord::toString)
+ Stream.of("Record(1, 1, 20240101, 01)", "Record(2, 2,
20240101, 01)")
.collect(Collectors.toList()));
}
@@ -264,7 +230,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
assertThatThrownBy(icebergMigrator::executeMigrate)
.rootCause()
@@ -306,7 +273,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
icebergMigrator.executeMigrate();
FileStoreTable paimonTable =
@@ -324,45 +292,117 @@ public class IcebergMigrateTest {
@ParameterizedTest(name = "isPartitioned = {0}")
@ValueSource(booleans = {true, false})
- public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws
Exception {
+ public void testDeleteColumn(boolean isPartitioned) throws Exception {
Table icebergTable = createIcebergTable(isPartitioned);
String format = "parquet";
+ writeInitialData(icebergTable, format, isPartitioned);
- // write base data
- List<GenericRecord> records1 =
+ icebergTable.updateSchema().deleteColumn("v").commit();
+ Schema newIceSchema = icebergTable.schema();
+ List<GenericRecord> addedRecords =
Stream.of(
- toIcebergRecord(1, 1, "20240101", "00"),
- toIcebergRecord(2, 2, "20240101", "00"))
+ toIcebergRecord(3, "20240101", "00",
newIceSchema),
+ toIcebergRecord(4, "20240101", "00",
newIceSchema))
.collect(Collectors.toList());
if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records1, "20240101",
"00");
+ writeRecordsToIceberg(icebergTable, format, addedRecords,
"20240101", "00");
} else {
- writeRecordsToIceberg(icebergTable, format, records1);
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
}
- List<GenericRecord> records2 =
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1,
+ Collections.emptyMap());
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable)
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List<String> paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 20240101, 00)",
+ "Record(2, 20240101, 00)",
+ "Record(1, 20240101, 01)",
+ "Record(2, 20240101, 01)",
+ "Record(3, 20240101, 00)",
+ "Record(4, 20240101, 00)")
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testRenameColumn(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ writeInitialData(icebergTable, format, isPartitioned);
+
+ icebergTable.updateSchema().renameColumn("v", "v2").commit();
+ Schema newIceSchema = icebergTable.schema();
+ List<GenericRecord> addedRecords =
Stream.of(
- toIcebergRecord(1, 1, "20240101", "01"),
- toIcebergRecord(2, 2, "20240101", "01"))
+ toIcebergRecord(newIceSchema, 3, 3,
"20240101", "00"),
+ toIcebergRecord(newIceSchema, 4, 4,
"20240101", "00"))
.collect(Collectors.toList());
if (isPartitioned) {
- writeRecordsToIceberg(icebergTable, format, records2, "20240101",
"01");
+ writeRecordsToIceberg(icebergTable, format, addedRecords,
"20240101", "00");
} else {
- writeRecordsToIceberg(icebergTable, format, records2);
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
}
- // TODO: currently only support schema evolution of deleting columns
- testDeleteColumn(icebergTable, format, isPartitioned);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1,
+ Collections.emptyMap());
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable)
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List<String> paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 1, 20240101, 00)",
+ "Record(2, 2, 20240101, 00)",
+ "Record(1, 1, 20240101, 01)",
+ "Record(2, 2, 20240101, 01)",
+ "Record(3, 3, 20240101, 00)",
+ "Record(4, 4, 20240101, 00)")
+ .collect(Collectors.toList()));
}
- private void testDeleteColumn(Table icebergTable, String format, boolean
isPartitioned)
- throws Exception {
- icebergTable.updateSchema().deleteColumn("v").commit();
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testAddColumn(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ writeInitialData(icebergTable, format, isPartitioned);
+
+ icebergTable.updateSchema().addColumn("v2",
Types.IntegerType.get()).commit();
Schema newIceSchema = icebergTable.schema();
List<GenericRecord> addedRecords =
Stream.of(
- toIcebergRecord(3, "20240101", "00",
newIceSchema),
- toIcebergRecord(4, "20240101", "00",
newIceSchema))
+ toIcebergRecord(newIceSchema, 3, 3,
"20240101", "00", 3),
+ toIcebergRecord(newIceSchema, 4, 4,
"20240101", "00", 4))
.collect(Collectors.toList());
if (isPartitioned) {
writeRecordsToIceberg(icebergTable, format, addedRecords,
"20240101", "00");
@@ -378,7 +418,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
icebergMigrator.executeMigrate();
FileStoreTable paimonTable =
@@ -390,15 +431,236 @@ public class IcebergMigrateTest {
.collect(Collectors.toList()))
.hasSameElementsAs(
Stream.of(
- "Record(1, 20240101, 00)",
- "Record(2, 20240101, 00)",
- "Record(1, 20240101, 01)",
- "Record(2, 20240101, 01)",
- "Record(3, 20240101, 00)",
- "Record(4, 20240101, 00)")
+ "Record(1, 1, 20240101, 00, NULL)",
+ "Record(2, 2, 20240101, 00, NULL)",
+ "Record(1, 1, 20240101, 01, NULL)",
+ "Record(2, 2, 20240101, 01, NULL)",
+ "Record(3, 3, 20240101, 00, 3)",
+ "Record(4, 4, 20240101, 00, 4)")
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testReorderColumn(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ writeInitialData(icebergTable, format, isPartitioned);
+
+ icebergTable.updateSchema().moveAfter("v", "hh").commit();
+ Schema newIceSchema = icebergTable.schema();
+ List<GenericRecord> addedRecords =
+ Stream.of(
+ toIcebergRecord(newIceSchema, 3, "20240101",
"00", 3),
+ toIcebergRecord(newIceSchema, 4, "20240101",
"00", 4))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, addedRecords,
"20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1,
+ Collections.emptyMap());
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable)
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List<String> paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 20240101, 00, 1)",
+ "Record(2, 20240101, 00, 2)",
+ "Record(1, 20240101, 01, 1)",
+ "Record(2, 20240101, 01, 2)",
+ "Record(3, 20240101, 00, 3)",
+ "Record(4, 20240101, 00, 4)")
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testUpdateColumn(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ writeInitialData(icebergTable, format, isPartitioned);
+
+ icebergTable.updateSchema().updateColumn("v",
Types.LongType.get()).commit();
+ Schema newIceSchema = icebergTable.schema();
+ List<GenericRecord> addedRecords =
+ Stream.of(
+ toIcebergRecord(newIceSchema, 3, 3L,
"20240101", "00"),
+ toIcebergRecord(newIceSchema, 4, 3L,
"20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, addedRecords,
"20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1,
+ Collections.emptyMap());
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable)
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List<String> paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 1, 20240101, 00)",
+ "Record(2, 2, 20240101, 00)",
+ "Record(1, 1, 20240101, 01)",
+ "Record(2, 2, 20240101, 01)",
+ "Record(3, 3, 20240101, 00)",
+ "Record(4, 3, 20240101, 00)")
.collect(Collectors.toList()));
}
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned)
throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ icebergTable.updateSchema().addColumn("v2",
Types.IntegerType.get()).commit();
+ String format = "parquet";
+ List<String> index = new LinkedList<>(Arrays.asList("k", "v", "dt",
"hh", "v2"));
+
+ int numRounds = 20;
+ int numRecords = 10;
+ List<Integer> ops = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean isTypeChange = false;
+ List<List<String>> expectRecords = new ArrayList<>();
+
+ for (int i = 0; i < numRounds; i++) {
+ List<GenericRecord> records = new ArrayList<>();
+ String dt = Integer.toString(random.nextInt(20240101, 20240104));
+ String hh = Integer.toString(random.nextInt(3));
+
+ if ((i + 1) % 4 == 0 && !ops.isEmpty()) {
+ switch (ops.remove(random.nextInt(ops.size()))) {
+ case 1:
+ icebergTable
+ .updateSchema()
+ .addColumn("v3", Types.IntegerType.get())
+ .commit();
+ for (List<String> record : expectRecords) {
+ record.add("NULL");
+ }
+ index.add("v3");
+ break;
+ case 2:
+ icebergTable.updateSchema().renameColumn("v",
"vv").commit();
+ break;
+ case 3:
+
icebergTable.updateSchema().deleteColumn("v2").commit();
+ int v2Idx = index.indexOf("v2");
+ for (List<String> record : expectRecords) {
+ record.remove(v2Idx);
+ }
+ index.remove(v2Idx);
+ break;
+ case 4:
+ icebergTable.updateSchema().moveAfter("k",
"hh").commit();
+ int kIdx = index.indexOf("k");
+ int hhIdx = index.indexOf("hh");
+ for (List<String> record : expectRecords) {
+ String k = record.remove(kIdx);
+ record.add(hhIdx, k);
+ }
+ index.remove(kIdx);
+ index.add(hhIdx, "k");
+ break;
+ case 5:
+ icebergTable
+ .updateSchema()
+ .updateColumn("k", Types.LongType.get())
+ .commit();
+ isTypeChange = true;
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation");
+ }
+ }
+ for (int j = 0; j < numRecords; j++) {
+ List<String> recordString = new ArrayList<>();
+ GenericRecord record =
GenericRecord.create(icebergTable.schema());
+ for (int idx = 0; idx < index.size(); idx++) {
+ String field = index.get(idx);
+ if (field.equals("dt")) {
+ record.set(idx, dt);
+ recordString.add(dt);
+ } else if (field.equals("hh")) {
+ record.set(idx, hh);
+ recordString.add(hh);
+ } else {
+ int value = random.nextInt(100);
+ if (field.equals("k") && isTypeChange) {
+ record.set(idx, (long) value);
+ } else {
+ record.set(idx, value);
+ }
+ recordString.add(String.valueOf(value));
+ }
+ }
+ records.add(record);
+ expectRecords.add(recordString);
+ }
+
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records, dt, hh);
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records);
+ }
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1,
+ Collections.emptyMap());
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable)
paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List<String> paiResults = getPaimonResult(paimonTable);
+ System.out.println();
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("[%s]", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+
expectRecords.stream().map(List::toString).collect(Collectors.toList()));
+ }
+
@Test
public void testAllDataTypes() throws Exception {
Schema iceAllTypesSchema =
@@ -444,7 +706,8 @@ public class IcebergMigrateTest {
iceDatabase,
iceTable,
new Options(icebergProperties),
- 1);
+ 1,
+ Collections.emptyMap());
icebergMigrator.executeMigrate();
FileStoreTable paimonTable =
@@ -490,6 +753,31 @@ public class IcebergMigrateTest {
}
}
+ private void writeInitialData(Table icebergTable, String format, boolean
isPartitioned)
+ throws IOException {
+ List<GenericRecord> records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101",
"00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List<GenericRecord> records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101",
"01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+ }
+
private GenericRecord toIcebergRecord(Schema icebergSchema, Object...
values) {
GenericRecord record = GenericRecord.create(icebergSchema);
for (int i = 0; i < values.length; i++) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 4e7268c6f1..5c0338f893 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -80,7 +80,8 @@ public class TableMigrationUtils {
sourceDatabase,
sourceTableName,
icebergConf,
- parallelism);
+ parallelism,
+ options);
}
public static List<Migrator> getImporters(