This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eb340b59de [core] Primary key types should not be changed (#6264)
eb340b59de is described below

commit eb340b59de7c9fb3066e209c1e6a9b88a495cede
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Wed Sep 17 17:35:46 2025 +0800

    [core] Primary key types should not be changed (#6264)
---
 .../org/apache/paimon/schema/SchemaManager.java    | 33 ++++++++++++++--------
 .../apache/paimon/table/SchemaEvolutionTest.java   | 16 +++++++++++
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  5 ++--
 .../FilterPushdownWithSchemaChangeITCase.java      |  5 ++--
 4 files changed, 43 insertions(+), 16 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index f2f3d89064..cbfe81b3a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -372,7 +372,7 @@ public class SchemaManager implements Serializable {
                 }.updateIntermediateColumn(newFields, 0);
             } else if (change instanceof RenameColumn) {
                 RenameColumn rename = (RenameColumn) change;
-                assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
+                assertNotUpdatingPartitionKeys(oldTableSchema, 
rename.fieldNames(), "rename");
                 new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
@@ -416,6 +416,7 @@ public class SchemaManager implements Serializable {
                 }.updateIntermediateColumn(newFields, 0);
             } else if (change instanceof UpdateColumnType) {
                 UpdateColumnType update = (UpdateColumnType) change;
+                assertNotUpdatingPartitionKeys(oldTableSchema, 
update.fieldNames(), "update");
                 assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
                 updateNestedColumn(
                         newFields,
@@ -458,11 +459,9 @@ public class SchemaManager implements Serializable {
                         lazyIdentifier);
             } else if (change instanceof UpdateColumnNullability) {
                 UpdateColumnNullability update = (UpdateColumnNullability) 
change;
-                if (update.fieldNames().length == 1
-                        && update.newNullability()
-                        && 
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
-                    throw new UnsupportedOperationException(
-                            "Cannot change nullability of primary key");
+                if (update.newNullability()) {
+                    assertNotUpdatingPrimaryKeys(
+                            oldTableSchema, update.fieldNames(), "change 
nullability of");
                 }
                 updateNestedColumn(
                         newFields,
@@ -839,17 +838,29 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    private static void assertNotUpdatingPrimaryKeys(
+    private static void assertNotUpdatingPartitionKeys(
             TableSchema schema, String[] fieldNames, String operation) {
         // partition keys can't be nested columns
         if (fieldNames.length > 1) {
             return;
         }
-        String columnToRename = fieldNames[0];
-        if (schema.partitionKeys().contains(columnToRename)) {
+        String fieldName = fieldNames[0];
+        if (schema.partitionKeys().contains(fieldName)) {
             throw new UnsupportedOperationException(
-                    String.format(
-                            "Cannot " + operation + " partition column: [%s]", 
columnToRename));
+                    String.format("Cannot %s partition column: [%s]", 
operation, fieldName));
+        }
+    }
+
+    private static void assertNotUpdatingPrimaryKeys(
+            TableSchema schema, String[] fieldNames, String operation) {
+        // primary keys can't be nested columns
+        if (fieldNames.length > 1) {
+            return;
+        }
+        String fieldName = fieldNames[0];
+        if (schema.primaryKeys().contains(fieldName)) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot %s primary key", operation));
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index ad540d58b4..8b118e0562 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -227,6 +227,22 @@ public class SchemaEvolutionTest {
         
assertThat(tableSchema.fields().get(0).type()).isEqualTo(DataTypes.STRING());
     }
 
+    @Test
+    public void testUpdatePrimaryKeyType() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k", DataTypes.INT())
+                        .column("v", DataTypes.BIGINT())
+                        .primaryKey("k")
+                        .build();
+        schemaManager.createTable(schema);
+
+        List<SchemaChange> changes =
+                Collections.singletonList(SchemaChange.updateColumnType("k", 
DataTypes.STRING()));
+        assertThatThrownBy(() -> schemaManager.commitChanges(changes))
+                .hasMessageContaining("Cannot update primary key");
+    }
+
     @Test
     public void testRenameField() throws Exception {
         Schema schema =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 4436aa392d..ee64162ad9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -516,8 +516,9 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         // first table
         data = new HashMap<>();
         data.put("pt", "1");
-        data.put("k", "123456789876543211");
+        data.put("k", "2");
         data.put("v", "varchar");
+        data.put("v2", "hello");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
@@ -528,7 +529,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         assertThat(actual).isNull();
 
         schemaManager = new SchemaManager(table1.fileIO(), table1.location());
-        schemaManager.commitChanges(SchemaChange.updateColumnType("k", 
DataTypes.BIGINT()));
+        schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.STRING()));
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
index 73ea8f0ae1..317d574194 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
@@ -247,13 +247,12 @@ public class FilterPushdownWithSchemaChangeITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')");
 
         // key filter
-        sql("ALTER TABLE T MODIFY (pk INT)");
         assertThat(sql("SELECT * FROM T WHERE pk > 9"))
-                .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11"));
+                .containsExactlyInAnyOrder(Row.of("10", "10"), Row.of("11", 
"11"));
 
         // value filter
         sql("ALTER TABLE T MODIFY (v INT)");
         assertThat(sql("SELECT * FROM T WHERE v > 9"))
-                .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11));
+                .containsExactlyInAnyOrder(Row.of("10", 10), Row.of("11", 11));
     }
 }

Reply via email to