This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new eb340b59de [core] Primary key types should not be changed (#6264) eb340b59de is described below commit eb340b59de7c9fb3066e209c1e6a9b88a495cede Author: tsreaper <tsreape...@gmail.com> AuthorDate: Wed Sep 17 17:35:46 2025 +0800 [core] Primary key types should not be changed (#6264) --- .../org/apache/paimon/schema/SchemaManager.java | 33 ++++++++++++++-------- .../apache/paimon/table/SchemaEvolutionTest.java | 16 +++++++++++ .../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 5 ++-- .../FilterPushdownWithSchemaChangeITCase.java | 5 ++-- 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index f2f3d89064..cbfe81b3a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -372,7 +372,7 @@ public class SchemaManager implements Serializable { }.updateIntermediateColumn(newFields, 0); } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - assertNotUpdatingPrimaryKeys(oldTableSchema, rename.fieldNames(), "rename"); + assertNotUpdatingPartitionKeys(oldTableSchema, rename.fieldNames(), "rename"); new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) { @Override protected void updateLastColumn( @@ -416,6 +416,7 @@ public class SchemaManager implements Serializable { }.updateIntermediateColumn(newFields, 0); } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; + assertNotUpdatingPartitionKeys(oldTableSchema, update.fieldNames(), "update"); assertNotUpdatingPrimaryKeys(oldTableSchema, update.fieldNames(), "update"); updateNestedColumn( newFields, @@ -458,11 +459,9 @@ public class SchemaManager implements Serializable { lazyIdentifier); } else if (change instanceof UpdateColumnNullability) { UpdateColumnNullability update = (UpdateColumnNullability) change; - if (update.fieldNames().length == 1 - && update.newNullability() - && oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) { - throw new UnsupportedOperationException( - "Cannot change nullability of primary key"); + if (update.newNullability()) { + assertNotUpdatingPrimaryKeys( + oldTableSchema, update.fieldNames(), "change nullability of"); } updateNestedColumn( newFields, @@ -839,17 +838,29 @@ public class SchemaManager implements Serializable { } } - private static void assertNotUpdatingPrimaryKeys( + private static void assertNotUpdatingPartitionKeys( TableSchema schema, String[] fieldNames, String operation) { // partition keys can't be nested columns if (fieldNames.length > 1) { return; } - String columnToRename = fieldNames[0]; - if (schema.partitionKeys().contains(columnToRename)) { + String fieldName = fieldNames[0]; + if (schema.partitionKeys().contains(fieldName)) { throw new UnsupportedOperationException( - String.format( - "Cannot " + operation + " partition column: [%s]", columnToRename)); + String.format("Cannot %s partition column: [%s]", operation, fieldName)); + } + } + + private static void assertNotUpdatingPrimaryKeys( + TableSchema schema, String[] fieldNames, String operation) { + // primary keys can't be nested columns + if (fieldNames.length > 1) { + return; + } + String fieldName = fieldNames[0]; + if (schema.primaryKeys().contains(fieldName)) { + throw new UnsupportedOperationException( + String.format("Cannot %s primary key", operation)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index ad540d58b4..8b118e0562 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -227,6 +227,22 @@ public class SchemaEvolutionTest { assertThat(tableSchema.fields().get(0).type()).isEqualTo(DataTypes.STRING()); } + @Test + public void testUpdatePrimaryKeyType() throws Exception { + Schema schema = + Schema.newBuilder() + .column("k", DataTypes.INT()) + .column("v", DataTypes.BIGINT()) + .primaryKey("k") + .build(); + schemaManager.createTable(schema); + + List<SchemaChange> changes = + Collections.singletonList(SchemaChange.updateColumnType("k", DataTypes.STRING())); + assertThatThrownBy(() -> schemaManager.commitChanges(changes)) + .hasMessageContaining("Cannot update primary key"); + } + @Test public void testRenameField() throws Exception { Schema schema = diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java index 4436aa392d..ee64162ad9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java @@ -516,8 +516,9 @@ public class CdcRecordStoreMultiWriteOperatorTest { // first table data = new HashMap<>(); data.put("pt", "1"); - data.put("k", "123456789876543211"); + data.put("k", "2"); data.put("v", "varchar"); + data.put("v2", "hello"); expected = CdcMultiplexRecord.fromCdcRecord( databaseName, @@ -528,7 +529,7 @@ public class CdcRecordStoreMultiWriteOperatorTest { assertThat(actual).isNull(); schemaManager = new SchemaManager(table1.fileIO(), table1.location()); - schemaManager.commitChanges(SchemaChange.updateColumnType("k", DataTypes.BIGINT())); + schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.STRING())); actual = runner.take(); assertThat(actual).isEqualTo(expected); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 73ea8f0ae1..317d574194 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -247,13 +247,12 @@ public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')"); // key filter - sql("ALTER TABLE T MODIFY (pk INT)"); assertThat(sql("SELECT * FROM T WHERE pk > 9")) - .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11")); + .containsExactlyInAnyOrder(Row.of("10", "10"), Row.of("11", "11")); // value filter sql("ALTER TABLE T MODIFY (v INT)"); assertThat(sql("SELECT * FROM T WHERE v > 9")) - .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11)); + .containsExactlyInAnyOrder(Row.of("10", 10), Row.of("11", 11)); } }