This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bba18669e [core] Validate schema after applying schema changes (#8063)
4bba18669e is described below

commit 4bba18669e3aaa16256ffb535d2cbaa04e6e4134
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:51:44 2026 +0800

    [core] Validate schema after applying schema changes (#8063)
---
 .../org/apache/paimon/schema/SchemaManager.java    | 19 ++++----
 .../apache/paimon/schema/SchemaManagerTest.java    | 55 ++++++++++++++++++++++
 .../org/apache/paimon/spark/SparkReadITCase.java   | 27 +++++++----
 3 files changed, 85 insertions(+), 16 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 905f307048..bfe3b7de84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -609,14 +609,17 @@ public class SchemaManager implements Serializable {
                         applyRenameColumnsToOptions(newOptions, changes),
                         newComment);
 
-        return new TableSchema(
-                oldTableSchema.id() + 1,
-                newSchema.fields(),
-                highestFieldId.get(),
-                newSchema.partitionKeys(),
-                newSchema.primaryKeys(),
-                newSchema.options(),
-                newSchema.comment());
+        TableSchema newTableSchema =
+                new TableSchema(
+                        oldTableSchema.id() + 1,
+                        newSchema.fields(),
+                        highestFieldId.get(),
+                        newSchema.partitionKeys(),
+                        newSchema.primaryKeys(),
+                        newSchema.options(),
+                        newSchema.comment());
+        SchemaValidation.validateTableSchema(newTableSchema);
+        return newTableSchema;
     }
 
     // gets the rootType at the defined depth
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 9a15cab361..4b7e6f3b31 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -92,6 +92,8 @@ public class SchemaManagerTest {
     private final List<String> primaryKeys = Arrays.asList("f0", "f1");
     private final Map<String, String> options = 
Collections.singletonMap("key", "value");
     private final RowType rowType = RowType.of(new IntType(), new 
BigIntType(), new VarCharType());
+    private final RowType rowTypeWithSequenceField =
+            RowType.of(new IntType(), new BigIntType(), new VarCharType(), new 
BigIntType());
     private final Schema schema =
             new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options, "");
 
@@ -168,6 +170,59 @@ public class SchemaManagerTest {
         assertThat(latest.get().options()).containsEntry("new_k", "new_v");
     }
 
+    @Test
+    public void testResetSequenceGroupForAggregateFunction() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+        options.put(CoreOptions.BUCKET.key(), "1");
+        options.put("fields.f2.aggregate-function", "sum");
+        options.put("fields.f3.sequence-group", "f2");
+        Schema schema =
+                new Schema(
+                        rowTypeWithSequenceField.getFields(),
+                        partitionKeys,
+                        primaryKeys,
+                        options,
+                        "");
+
+        retryArtificialException(() -> manager.createTable(schema));
+
+        assertThatThrownBy(
+                        () ->
+                                retryArtificialException(
+                                        () ->
+                                                manager.commitChanges(
+                                                        
SchemaChange.removeOption(
+                                                                
"fields.f3.sequence-group"))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Must use sequence group for aggregation functions but 
not found for field f2.");
+    }
+
+    @Test
+    public void testResetSequenceGroupForLastNonNullAggregateFunction() throws 
Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+        options.put(CoreOptions.BUCKET.key(), "1");
+        options.put("fields.f2.aggregate-function", "last_non_null_value");
+        options.put("fields.f3.sequence-group", "f2");
+        Schema schema =
+                new Schema(
+                        rowTypeWithSequenceField.getFields(),
+                        partitionKeys,
+                        primaryKeys,
+                        options,
+                        "");
+
+        retryArtificialException(() -> manager.createTable(schema));
+        retryArtificialException(
+                () -> 
manager.commitChanges(SchemaChange.removeOption("fields.f3.sequence-group")));
+
+        Optional<TableSchema> latest = retryArtificialException(() -> 
manager.latest());
+        assertThat(latest.isPresent()).isTrue();
+        
assertThat(latest.get().options()).doesNotContainKey("fields.f3.sequence-group");
+    }
+
     @Test
     public void testConcurrentCommit() throws Exception {
         retryArtificialException(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 5ed50328ec..fbc2f660d4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -51,6 +51,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** ITCase for spark reader. */
 public class SparkReadITCase extends SparkReadTestBase {
 
+    private static final String CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS =
+            "Can not set changelog-producer on table without primary keys";
+
     @Test
     public void testNormal() {
         innerTestSimpleType(spark.table("t1"));
@@ -311,10 +314,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                         () ->
                                 spark.sql(
                                         "CREATE TABLE T (a INT) TBLPROPERTIES 
('changelog-producer' = 'input')"))
-                .rootCause()
-                .isInstanceOf(RuntimeException.class)
-                .hasMessageContaining(
-                        "Can not set changelog-producer on table without 
primary keys");
+                
.satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys);
 
         spark.sql("CREATE TABLE T (a INT)");
 
@@ -322,10 +322,21 @@ public class SparkReadITCase extends SparkReadTestBase {
                         () ->
                                 spark.sql(
                                         "ALTER TABLE T SET 
TBLPROPERTIES('changelog-producer' 'input')"))
-                .rootCause()
-                .isInstanceOf(RuntimeException.class)
-                .hasMessageContaining(
-                        "Can not set changelog-producer on table without 
primary keys");
+                
.satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys);
+    }
+
+    private static void assertChangelogProducerWithoutPrimaryKeys(Throwable 
throwable) {
+        Throwable current = throwable;
+        while (current != null) {
+            if (current instanceof RuntimeException
+                    && current.getMessage() != null
+                    && 
current.getMessage().contains(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS)) {
+                return;
+            }
+            current = current.getCause();
+        }
+
+        
assertThat(throwable).hasMessageContaining(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS);
     }
 
     @Test

Reply via email to