This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f3c2dfe576 [core] aggregation options are not verified when executing
DDL (#5276)
f3c2dfe576 is described below
commit f3c2dfe5763237cfe53f9306f04cb1ed93c80298
Author: JennyChen <[email protected]>
AuthorDate: Mon Jul 7 15:36:19 2025 +0800
[core] aggregation options are not verified when executing DDL (#5276)
---
.../apache/paimon/table/PrimaryKeyTableUtils.java | 18 ++++++++++++
.../paimon/flink/AbstractFlinkTableFactory.java | 3 +-
.../apache/paimon/flink/PartialUpdateITCase.java | 19 ++++++++++++
.../apache/paimon/flink/PreAggregationITCase.java | 34 ++++++++++++++++++++++
4 files changed, 73 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index a47e44718e..2d152e958e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -20,11 +20,14 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
+import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
@@ -75,6 +78,21 @@ public class PrimaryKeyTableUtils {
}
}
+ public static void validateMergeFunctionFactory(TableSchema tableSchema) {
+ CoreOptions options = new CoreOptions(tableSchema.options());
+ for (int i = 0; i <
tableSchema.logicalRowType().getFieldNames().size(); i++) {
+ String fieldName =
tableSchema.logicalRowType().getFieldNames().get(i);
+ String aggFuncName = options.fieldAggFunc(fieldName);
+ aggFuncName = aggFuncName == null ? options.fieldsDefaultFunc() :
aggFuncName;
+ if (aggFuncName != null) {
+ FactoryUtil.discoverFactory(
+ FieldAggregator.class.getClassLoader(),
+ FieldAggregatorFactory.class,
+ aggFuncName);
+ }
+ }
+ }
+
/** Primary key fields extractor. */
public static class PrimaryKeyFieldsExtractor implements
KeyValueFieldsExtractor {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 4568234894..36d8525f04 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -35,6 +35,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.Preconditions;
@@ -226,7 +227,7 @@ public abstract class AbstractFlinkTableFactory
throw new RuntimeException(e);
}
}
-
+
PrimaryKeyTableUtils.validateMergeFunctionFactory(fileStoreTable.schema());
FileStoreTable table =
fileStoreTable.copyWithoutTimeTravel(newOptions);
if
(Options.fromMap(table.options()).get(FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED)) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 2da837d2f4..59ecc65a36 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -772,4 +773,22 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
assertThat(sql("SELECT * FROM
seq_default_agg")).containsExactly(Row.of(0, 2, 3));
}
+
+ @Test
+ public void testSequenceGroupWithNotExistAgg() {
+ sql(
+ "CREATE TABLE seq_not_exist_agg ("
+ + " pk INT PRIMARY KEY NOT ENFORCED,"
+ + " seq INT,"
+ + " v INT) WITH ("
+ + " 'merge-engine'='partial-update',"
+ + " 'fields.seq.sequence-group'='v',"
+ + " 'fields.default-aggregate-function'='not_exist'"
+ + ")");
+
+ assertThatThrownBy(() -> sql("INSERT INTO seq_not_exist_agg VALUES (0,
1, 1)"))
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unable to create a sink for writing table
'PAIMON.default.seq_not_exist_agg'.");
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 7b8ce3904e..e37d18ddb8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -26,6 +26,7 @@ import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.utils.RoaringBitmap64;
import org.apache.commons.codec.binary.Hex;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -2133,4 +2134,37 @@ public class PreAggregationITCase {
assertThat(row.getField(1)).isEqualTo(expected);
}
}
+
+ /** ITCase for testing the aggregation merge engine with not exist
aggregation function. */
+ public static class NotExistAggregationFunctionITCase extends
CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE test_not_exist("
+ + " id INT PRIMARY KEY NOT ENFORCED,"
+ + " f0 INT"
+ + ") WITH ("
+ + " 'merge-engine' = 'aggregation',"
+ + " 'fields.f0.aggregate-function' = 'not_exist'"
+ + ")");
+ }
+
+ @Test
+ public void testInsert() {
+ assertThatThrownBy(
+ () -> sql("INSERT INTO test_not_exist VALUES (1,
1), (2, 2), (3, 3)"))
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unable to create a sink for writing table
'PAIMON.default.test_not_exist'.");
+ }
+
+ @Test
+ public void testSelect() {
+ assertThatThrownBy(() -> sql("select * from test_not_exist"))
+ .hasCauseInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Unable to create a source for reading table
'PAIMON.default.test_not_exist'.");
+ }
+ }
}