This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bd9317e74b [core] Check that all fields with aggregate functions in 
partial-update should be protected by sequence-group (#5034)
bd9317e74b is described below

commit bd9317e74be3755226ad5d92f4e207a1c1185722
Author: yuzelin <[email protected]>
AuthorDate: Sun Feb 9 19:56:10 2025 +0800

    [core] Check that all fields with aggregate functions in partial-update 
should be protected by sequence-group (#5034)
---
 .../compact/PartialUpdateMergeFunction.java        |  9 ++++--
 .../compact/PartialUpdateMergeFunctionTest.java    | 37 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index a28ac52df4..3ce51127b1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
@@ -548,9 +549,13 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
                 String aggFuncName = getAggFuncName(options, fieldName);
                 if (aggFuncName != null) {
+                    // last_non_null_value doesn't require sequence group
                     checkArgument(
-                            !fieldSeqComparators.isEmpty(),
-                            "Must use sequence group for aggregation 
functions.");
+                            
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
+                                    || fieldSeqComparators.containsKey(
+                                            fieldNames.indexOf(fieldName)),
+                            "Must use sequence group for aggregation functions 
but not found for field %s.",
+                            fieldName);
                     fieldAggregators.put(
                             i,
                             () ->
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 529110cabc..28625a9bf3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -31,6 +32,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -820,14 +822,39 @@ public class PartialUpdateMergeFunctionTest {
 
     @Test
     public void testAggregationWithoutSequenceGroup() {
-        Options options = new Options();
-        options.set("fields.f1.aggregate-function", "listagg");
-        RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT());
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.INT()
+                        },
+                        new String[] {"pk", "f0", "g0", "f1", "g1"});
+
+        Options options1 = new Options();
+        options1.set("fields.f0.aggregate-function", "listagg");
+        options1.set("fields.f1.aggregate-function", "listagg");
         assertThatThrownBy(
                         () ->
                                 PartialUpdateMergeFunction.factory(
-                                        options, rowType, 
ImmutableList.of("f0")))
-                .hasMessageContaining("Must use sequence group for aggregation 
functions");
+                                        options1, rowType, 
ImmutableList.of("pk")))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Must use sequence group for aggregation 
functions but not found for field f0."));
+
+        Options options2 = new Options(options1.toMap());
+        options2.set("fields.g0.sequence-group", "f0");
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options2, rowType, 
ImmutableList.of("pk")))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Must use sequence group for aggregation 
functions but not found for field f1."));
     }
 
     private void add(MergeFunction<KeyValue> function, Integer... f) {

Reply via email to