This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4bba18669e [core] Validate schema after applying schema changes (#8063)
4bba18669e is described below
commit 4bba18669e3aaa16256ffb535d2cbaa04e6e4134
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 9 13:51:44 2026 +0800
[core] Validate schema after applying schema changes (#8063)
---
.../org/apache/paimon/schema/SchemaManager.java | 19 ++++----
.../apache/paimon/schema/SchemaManagerTest.java | 55 ++++++++++++++++++++++
.../org/apache/paimon/spark/SparkReadITCase.java | 27 +++++++----
3 files changed, 85 insertions(+), 16 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 905f307048..bfe3b7de84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -609,14 +609,17 @@ public class SchemaManager implements Serializable {
applyRenameColumnsToOptions(newOptions, changes),
newComment);
- return new TableSchema(
- oldTableSchema.id() + 1,
- newSchema.fields(),
- highestFieldId.get(),
- newSchema.partitionKeys(),
- newSchema.primaryKeys(),
- newSchema.options(),
- newSchema.comment());
+ TableSchema newTableSchema =
+ new TableSchema(
+ oldTableSchema.id() + 1,
+ newSchema.fields(),
+ highestFieldId.get(),
+ newSchema.partitionKeys(),
+ newSchema.primaryKeys(),
+ newSchema.options(),
+ newSchema.comment());
+ SchemaValidation.validateTableSchema(newTableSchema);
+ return newTableSchema;
}
// gets the rootType at the defined depth
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 9a15cab361..4b7e6f3b31 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -92,6 +92,8 @@ public class SchemaManagerTest {
private final List<String> primaryKeys = Arrays.asList("f0", "f1");
private final Map<String, String> options =
Collections.singletonMap("key", "value");
private final RowType rowType = RowType.of(new IntType(), new
BigIntType(), new VarCharType());
+ private final RowType rowTypeWithSequenceField =
+ RowType.of(new IntType(), new BigIntType(), new VarCharType(), new
BigIntType());
private final Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options, "");
@@ -168,6 +170,59 @@ public class SchemaManagerTest {
assertThat(latest.get().options()).containsEntry("new_k", "new_v");
}
+ @Test
+ public void testResetSequenceGroupForAggregateFunction() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+ options.put(CoreOptions.BUCKET.key(), "1");
+ options.put("fields.f2.aggregate-function", "sum");
+ options.put("fields.f3.sequence-group", "f2");
+ Schema schema =
+ new Schema(
+ rowTypeWithSequenceField.getFields(),
+ partitionKeys,
+ primaryKeys,
+ options,
+ "");
+
+ retryArtificialException(() -> manager.createTable(schema));
+
+ assertThatThrownBy(
+ () ->
+ retryArtificialException(
+ () ->
+ manager.commitChanges(
+
SchemaChange.removeOption(
+
"fields.f3.sequence-group"))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "Must use sequence group for aggregation functions but
not found for field f2.");
+ }
+
+ @Test
+ public void testResetSequenceGroupForLastNonNullAggregateFunction() throws
Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.MERGE_ENGINE.key(), "partial-update");
+ options.put(CoreOptions.BUCKET.key(), "1");
+ options.put("fields.f2.aggregate-function", "last_non_null_value");
+ options.put("fields.f3.sequence-group", "f2");
+ Schema schema =
+ new Schema(
+ rowTypeWithSequenceField.getFields(),
+ partitionKeys,
+ primaryKeys,
+ options,
+ "");
+
+ retryArtificialException(() -> manager.createTable(schema));
+ retryArtificialException(
+ () ->
manager.commitChanges(SchemaChange.removeOption("fields.f3.sequence-group")));
+
+ Optional<TableSchema> latest = retryArtificialException(() ->
manager.latest());
+ assertThat(latest.isPresent()).isTrue();
+
assertThat(latest.get().options()).doesNotContainKey("fields.f3.sequence-group");
+ }
+
@Test
public void testConcurrentCommit() throws Exception {
retryArtificialException(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 5ed50328ec..fbc2f660d4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -51,6 +51,9 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for spark reader. */
public class SparkReadITCase extends SparkReadTestBase {
+ private static final String CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS =
+ "Can not set changelog-producer on table without primary keys";
+
@Test
public void testNormal() {
innerTestSimpleType(spark.table("t1"));
@@ -311,10 +314,7 @@ public class SparkReadITCase extends SparkReadTestBase {
() ->
spark.sql(
"CREATE TABLE T (a INT) TBLPROPERTIES
('changelog-producer' = 'input')"))
- .rootCause()
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining(
- "Can not set changelog-producer on table without
primary keys");
+
.satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys);
spark.sql("CREATE TABLE T (a INT)");
@@ -322,10 +322,21 @@ public class SparkReadITCase extends SparkReadTestBase {
() ->
spark.sql(
"ALTER TABLE T SET
TBLPROPERTIES('changelog-producer' 'input')"))
- .rootCause()
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining(
- "Can not set changelog-producer on table without
primary keys");
+
.satisfies(SparkReadITCase::assertChangelogProducerWithoutPrimaryKeys);
+ }
+
+ private static void assertChangelogProducerWithoutPrimaryKeys(Throwable
throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof RuntimeException
+ && current.getMessage() != null
+ &&
current.getMessage().contains(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS)) {
+ return;
+ }
+ current = current.getCause();
+ }
+
+
assertThat(throwable).hasMessageContaining(CHANGELOG_PRODUCER_WITHOUT_PRIMARY_KEYS);
}
@Test