This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5f090f5 [core] Support multiple fields in sequence-group (#3461)
1b5f090f5 is described below

commit 1b5f090f57432f3ad7f936735c00c4d2eadad140
Author: xiangyu0xf <[email protected]>
AuthorDate: Wed Jun 12 10:58:00 2024 +0800

    [core] Support multiple fields in sequence-group (#3461)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 +
 .../compact/PartialUpdateMergeFunction.java        | 406 +++++++++------------
 .../org/apache/paimon/schema/SchemaValidation.java |  57 ++-
 .../paimon/utils/UserDefinedSeqComparator.java     |  14 +-
 .../mergetree/SortBufferWriteBufferTestBase.java   |   2 +-
 .../compact/PartialUpdateMergeFunctionTest.java    | 371 +++++++++++++++++++
 .../apache/paimon/flink/PartialUpdateITCase.java   |  92 ++++-
 7 files changed, 682 insertions(+), 262 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 5faee52b3..484a20517 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -65,6 +65,8 @@ public class CoreOptions implements Serializable {
 
     public static final String FIELDS_PREFIX = "fields";
 
+    public static final String FIELDS_SEPARATOR = ",";
+
     public static final String AGG_FUNCTION = "aggregate-function";
     public static final String DEFAULT_AGG_FUNCTION = 
"default-aggregate-function";
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 737fc5284..154595859 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -24,29 +24,20 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypeDefaultVisitor;
-import org.apache.paimon.types.DateType;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.DoubleType;
-import org.apache.paimon.types.FloatType;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.SmallIntType;
-import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.Projection;
+import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +47,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
 import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
 
 /**
@@ -68,7 +60,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
     private final InternalRow.FieldGetter[] getters;
     private final boolean ignoreDelete;
-    private final Map<Integer, SequenceGenerator> fieldSequences;
+    private final Map<Integer, FieldsComparator> fieldSeqComparators;
     private final boolean fieldSequenceEnabled;
     private final Map<Integer, FieldAggregator> fieldAggregators;
 
@@ -80,12 +72,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     protected PartialUpdateMergeFunction(
             InternalRow.FieldGetter[] getters,
             boolean ignoreDelete,
-            Map<Integer, SequenceGenerator> fieldSequences,
+            Map<Integer, FieldsComparator> fieldSeqComparators,
             Map<Integer, FieldAggregator> fieldAggregators,
             boolean fieldSequenceEnabled) {
         this.getters = getters;
         this.ignoreDelete = ignoreDelete;
-        this.fieldSequences = fieldSequences;
+        this.fieldSeqComparators = fieldSeqComparators;
         this.fieldAggregators = fieldAggregators;
         this.fieldSequenceEnabled = fieldSequenceEnabled;
     }
@@ -126,7 +118,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         latestSequenceNumber = kv.sequenceNumber();
-        if (fieldSequences.isEmpty()) {
+        if (fieldSeqComparators.isEmpty()) {
             updateNonNullFields(kv);
         } else {
             updateWithSequenceGroup(kv);
@@ -145,64 +137,94 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     private void updateWithSequenceGroup(KeyValue kv) {
         for (int i = 0; i < getters.length; i++) {
             Object field = getters[i].getFieldOrNull(kv.value());
-            SequenceGenerator sequenceGen = fieldSequences.get(i);
+            FieldsComparator seqComparator = fieldSeqComparators.get(i);
             FieldAggregator aggregator = fieldAggregators.get(i);
             Object accumulator = getters[i].getFieldOrNull(row);
-            if (sequenceGen == null) {
+            if (seqComparator == null) {
                 if (aggregator != null) {
                     row.setField(i, aggregator.agg(accumulator, field));
                 } else if (field != null) {
                     row.setField(i, field);
                 }
             } else {
-                Long currentSeq = sequenceGen.generate(kv.value());
-                if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generate(row);
-                    if (previousSeq == null || currentSeq >= previousSeq) {
-                        row.setField(
-                                i, aggregator == null ? field : 
aggregator.agg(accumulator, field));
-                    } else if (aggregator != null) {
-                        row.setField(i, aggregator.agg(field, accumulator));
+                if (isEmptySequenceGroup(kv, seqComparator)) {
+                    // skip null sequence group
+                    continue;
+                }
+
+                if (seqComparator.compare(kv.value(), row) >= 0) {
+                    int index = i;
+
+                    // Multiple sequence fields should be updated at once.
+                    if (Arrays.stream(seqComparator.compareFields())
+                            .anyMatch(seqIndex -> seqIndex == index)) {
+                        for (int fieldIndex : seqComparator.compareFields()) {
+                            row.setField(
+                                    fieldIndex, 
getters[fieldIndex].getFieldOrNull(kv.value()));
+                        }
                     }
+                    row.setField(
+                            i, aggregator == null ? field : 
aggregator.agg(accumulator, field));
+                } else if (aggregator != null) {
+                    row.setField(i, aggregator.agg(field, accumulator));
                 }
             }
         }
     }
 
+    private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator 
comparator) {
+        for (int fieldIndex : comparator.compareFields()) {
+            if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     private void retractWithSequenceGroup(KeyValue kv) {
+        Set<Integer> updatedSequenceFields = new HashSet<>();
+
         for (int i = 0; i < getters.length; i++) {
-            SequenceGenerator sequenceGen = fieldSequences.get(i);
-            if (sequenceGen != null) {
-                Long currentSeq = sequenceGen.generate(kv.value());
-                if (currentSeq != null) {
-                    Long previousSeq = sequenceGen.generate(row);
-                    FieldAggregator aggregator = fieldAggregators.get(i);
-                    if (previousSeq == null || currentSeq >= previousSeq) {
-                        if (sequenceGen.index() == i) {
-                            // update sequence field
-                            row.setField(i, 
getters[i].getFieldOrNull(kv.value()));
-                        } else {
-                            // retract normal field
-                            if (aggregator == null) {
-                                row.setField(i, null);
-                            } else {
-                                // retract agg field
-                                Object accumulator = 
getters[i].getFieldOrNull(row);
-                                row.setField(
-                                        i,
-                                        aggregator.retract(
-                                                accumulator,
-                                                
getters[i].getFieldOrNull(kv.value())));
+            FieldsComparator seqComparator = fieldSeqComparators.get(i);
+            if (seqComparator != null) {
+                FieldAggregator aggregator = fieldAggregators.get(i);
+                if (isEmptySequenceGroup(kv, seqComparator)) {
+                    // skip null sequence group
+                    continue;
+                }
+
+                if (seqComparator.compare(kv.value(), row) >= 0) {
+                    int index = i;
+
+                    // Multiple sequence fields should be updated at once.
+                    if (Arrays.stream(seqComparator.compareFields())
+                            .anyMatch(field -> field == index)) {
+                        for (int field : seqComparator.compareFields()) {
+                            if (!updatedSequenceFields.contains(field)) {
+                                row.setField(field, 
getters[field].getFieldOrNull(kv.value()));
+                                updatedSequenceFields.add(field);
                             }
                         }
-                    } else if (aggregator != null) {
-                        // retract agg field for old sequence
-                        Object accumulator = getters[i].getFieldOrNull(row);
-                        row.setField(
-                                i,
-                                aggregator.retract(
-                                        accumulator, 
getters[i].getFieldOrNull(kv.value())));
+                    } else {
+                        // retract normal field
+                        if (aggregator == null) {
+                            row.setField(i, null);
+                        } else {
+                            // retract agg field
+                            Object accumulator = 
getters[i].getFieldOrNull(row);
+                            row.setField(
+                                    i,
+                                    aggregator.retract(
+                                            accumulator, 
getters[i].getFieldOrNull(kv.value())));
+                        }
                     }
+                } else if (aggregator != null) {
+                    // retract agg field for old sequence
+                    Object accumulator = getters[i].getFieldOrNull(row);
+                    row.setField(
+                            i,
+                            aggregator.retract(accumulator, 
getters[i].getFieldOrNull(kv.value())));
                 }
             }
         }
@@ -226,57 +248,65 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         private static final long serialVersionUID = 1L;
 
         private final boolean ignoreDelete;
+        private final RowType rowType;
+
         private final List<DataType> tableTypes;
-        private final Map<Integer, SequenceGenerator> fieldSequences;
+
+        private final Map<Integer, FieldsComparator> fieldSeqComparators;
 
         private final Map<Integer, FieldAggregator> fieldAggregators;
 
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
             this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
+            this.rowType = rowType;
             this.tableTypes = rowType.getFieldTypes();
 
             List<String> fieldNames = rowType.getFieldNames();
-            this.fieldSequences = new HashMap<>();
+            this.fieldSeqComparators = new HashMap<>();
             for (Map.Entry<String, String> entry : options.toMap().entrySet()) 
{
                 String k = entry.getKey();
                 String v = entry.getValue();
                 if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) 
{
-                    String sequenceFieldName =
-                            k.substring(
-                                    FIELDS_PREFIX.length() + 1,
-                                    k.length() - SEQUENCE_GROUP.length() - 1);
-                    SequenceGenerator sequenceGen =
-                            new SequenceGenerator(sequenceFieldName, rowType);
-                    Arrays.stream(v.split(","))
+                    List<String> sequenceFields =
+                            Arrays.stream(
+                                            k.substring(
+                                                            
FIELDS_PREFIX.length() + 1,
+                                                            k.length()
+                                                                    - 
SEQUENCE_GROUP.length()
+                                                                    - 1)
+                                                    .split(FIELDS_SEPARATOR))
+                                    .map(fieldName -> 
validateFieldName(fieldName, fieldNames))
+                                    .collect(Collectors.toList());
+
+                    UserDefinedSeqComparator userDefinedSeqComparator =
+                            UserDefinedSeqComparator.create(rowType, 
sequenceFields);
+                    Arrays.stream(v.split(FIELDS_SEPARATOR))
                             .map(
-                                    fieldName -> {
-                                        int field = 
fieldNames.indexOf(fieldName);
-                                        if (field == -1) {
-                                            throw new IllegalArgumentException(
-                                                    String.format(
-                                                            "Field %s can not 
be found in table schema",
-                                                            fieldName));
-                                        }
-                                        return field;
-                                    })
+                                    fieldName ->
+                                            fieldNames.indexOf(
+                                                    
validateFieldName(fieldName, fieldNames)))
                             .forEach(
                                     field -> {
-                                        if (fieldSequences.containsKey(field)) 
{
+                                        if 
(fieldSeqComparators.containsKey(field)) {
                                             throw new IllegalArgumentException(
                                                     String.format(
                                                             "Field %s is 
defined repeatedly by multiple groups: %s",
                                                             
fieldNames.get(field), k));
                                         }
-                                        fieldSequences.put(field, sequenceGen);
+                                        fieldSeqComparators.put(field, 
userDefinedSeqComparator);
                                     });
 
                     // add self
-                    fieldSequences.put(sequenceGen.index(), sequenceGen);
+                    sequenceFields.forEach(
+                            fieldName -> {
+                                int index = fieldNames.indexOf(fieldName);
+                                fieldSeqComparators.put(index, 
userDefinedSeqComparator);
+                            });
                 }
             }
             this.fieldAggregators =
                     createFieldAggregators(rowType, primaryKeys, new 
CoreOptions(options));
-            if (fieldAggregators.size() > 0 && fieldSequences.isEmpty()) {
+            if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) {
                 throw new IllegalArgumentException(
                         "Must use sequence group for aggregation functions.");
             }
@@ -285,29 +315,46 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
             if (projection != null) {
-                Map<Integer, SequenceGenerator> projectedSequences = new 
HashMap<>();
+                Map<Integer, FieldsComparator> projectedSeqComparators = new 
HashMap<>();
                 Map<Integer, FieldAggregator> projectedAggregators = new 
HashMap<>();
                 int[] projects = Projection.of(projection).toTopLevelIndexes();
                 Map<Integer, Integer> indexMap = new HashMap<>();
+                List<DataField> dataFields = rowType.getFields();
+                List<DataType> newDataTypes = new ArrayList<>();
+
                 for (int i = 0; i < projects.length; i++) {
                     indexMap.put(projects[i], i);
+                    newDataTypes.add(dataFields.get(projects[i]).type());
                 }
-                fieldSequences.forEach(
-                        (field, sequence) -> {
+                RowType newRowType = 
RowType.builder().fields(newDataTypes).build();
+
+                fieldSeqComparators.forEach(
+                        (field, comparator) -> {
                             int newField = indexMap.getOrDefault(field, -1);
                             if (newField != -1) {
-                                int newSequenceId = 
indexMap.getOrDefault(sequence.index(), -1);
-                                if (newSequenceId == -1) {
-                                    throw new RuntimeException(
-                                            String.format(
-                                                    "Can not find new sequence 
field for new field. new field index is %s",
-                                                    newField));
-                                } else {
-                                    projectedSequences.put(
-                                            newField,
-                                            new SequenceGenerator(
-                                                    newSequenceId, 
sequence.fieldType()));
-                                }
+                                int[] newSequenceFields =
+                                        
Arrays.stream(comparator.compareFields())
+                                                .map(
+                                                        index -> {
+                                                            int newIndex =
+                                                                    
indexMap.getOrDefault(
+                                                                            
index, -1);
+                                                            if (newIndex == 
-1) {
+                                                                throw new 
RuntimeException(
+                                                                        
String.format(
+                                                                               
 "Can not find new sequence field "
+                                                                               
         + "for new field. new field "
+                                                                               
         + "index is %s",
+                                                                               
 newField));
+                                                            } else {
+                                                                return 
newIndex;
+                                                            }
+                                                        })
+                                                .toArray();
+                                projectedSeqComparators.put(
+                                        newField,
+                                        UserDefinedSeqComparator.create(
+                                                newRowType, 
newSequenceFields));
                             }
                         });
                 for (int i = 0; i < projects.length; i++) {
@@ -319,22 +366,22 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 return new PartialUpdateMergeFunction(
                         
createFieldGetters(Projection.of(projection).project(tableTypes)),
                         ignoreDelete,
-                        projectedSequences,
+                        projectedSeqComparators,
                         projectedAggregators,
-                        !fieldSequences.isEmpty());
+                        !fieldSeqComparators.isEmpty());
             } else {
                 return new PartialUpdateMergeFunction(
                         createFieldGetters(tableTypes),
                         ignoreDelete,
-                        fieldSequences,
+                        fieldSeqComparators,
                         fieldAggregators,
-                        !fieldSequences.isEmpty());
+                        !fieldSeqComparators.isEmpty());
             }
         }
 
         @Override
         public AdjustedProjection adjustProjection(@Nullable int[][] 
projection) {
-            if (fieldSequences.isEmpty()) {
+            if (fieldSeqComparators.isEmpty()) {
                 return new AdjustedProjection(projection, null);
             }
 
@@ -345,9 +392,15 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             int[] topProjects = Projection.of(projection).toTopLevelIndexes();
             Set<Integer> indexSet = 
Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
             for (int index : topProjects) {
-                SequenceGenerator generator = fieldSequences.get(index);
-                if (generator != null && 
!indexSet.contains(generator.index())) {
-                    extraFields.add(generator.index());
+                FieldsComparator comparator = fieldSeqComparators.get(index);
+                if (comparator == null) {
+                    continue;
+                }
+
+                for (int field : comparator.compareFields()) {
+                    if (!indexSet.contains(field)) {
+                        extraFields.add(field);
+                    }
                 }
             }
 
@@ -356,11 +409,21 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                             .mapToInt(Integer::intValue)
                             .toArray();
 
-            int[][] pushdown = Projection.of(allProjects).toNestedIndexes();
+            int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
             int[][] outer =
                     Projection.of(IntStream.range(0, 
topProjects.length).toArray())
                             .toNestedIndexes();
-            return new AdjustedProjection(pushdown, outer);
+            return new AdjustedProjection(pushDown, outer);
+        }
+
+        private String validateFieldName(String fieldName, List<String> 
fieldNames) {
+            int field = fieldNames.indexOf(fieldName);
+            if (field == -1) {
+                throw new IllegalArgumentException(
+                        String.format("Field %s can not be found in table 
schema", fieldName));
+            }
+
+            return fieldName;
         }
 
         /**
@@ -408,137 +471,4 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             return fieldAggregators;
         }
     }
-
-    private static class SequenceGenerator {
-
-        private final int index;
-
-        private final Generator generator;
-        private final DataType fieldType;
-
-        private SequenceGenerator(String field, RowType rowType) {
-            index = rowType.getFieldNames().indexOf(field);
-            if (index == -1) {
-                throw new RuntimeException(
-                        String.format(
-                                "Can not find sequence field %s in table 
schema: %s",
-                                field, rowType));
-            }
-            fieldType = rowType.getTypeAt(index);
-            generator = fieldType.accept(new SequenceGeneratorVisitor());
-        }
-
-        public SequenceGenerator(int index, DataType dataType) {
-            this.index = index;
-
-            this.fieldType = dataType;
-            if (index == -1) {
-                throw new RuntimeException(String.format("Index : %s is 
invalid", index));
-            }
-            generator = fieldType.accept(new SequenceGeneratorVisitor());
-        }
-
-        public int index() {
-            return index;
-        }
-
-        public DataType fieldType() {
-            return fieldType;
-        }
-
-        @Nullable
-        public Long generate(InternalRow row) {
-            return generator.generateNullable(row, index);
-        }
-
-        private interface Generator {
-            long generate(InternalRow row, int i);
-
-            @Nullable
-            default Long generateNullable(InternalRow row, int i) {
-                if (row.isNullAt(i)) {
-                    return null;
-                }
-                return generate(row, i);
-            }
-        }
-
-        private static class SequenceGeneratorVisitor extends 
DataTypeDefaultVisitor<Generator> {
-
-            @Override
-            public Generator visit(CharType charType) {
-                return stringGenerator();
-            }
-
-            @Override
-            public Generator visit(VarCharType varCharType) {
-                return stringGenerator();
-            }
-
-            private Generator stringGenerator() {
-                return (row, i) -> Long.parseLong(row.getString(i).toString());
-            }
-
-            @Override
-            public Generator visit(DecimalType decimalType) {
-                return (row, i) ->
-                        InternalRowUtils.castToIntegral(
-                                row.getDecimal(
-                                        i, decimalType.getPrecision(), 
decimalType.getScale()));
-            }
-
-            @Override
-            public Generator visit(TinyIntType tinyIntType) {
-                return InternalRow::getByte;
-            }
-
-            @Override
-            public Generator visit(SmallIntType smallIntType) {
-                return InternalRow::getShort;
-            }
-
-            @Override
-            public Generator visit(IntType intType) {
-                return InternalRow::getInt;
-            }
-
-            @Override
-            public Generator visit(BigIntType bigIntType) {
-                return InternalRow::getLong;
-            }
-
-            @Override
-            public Generator visit(FloatType floatType) {
-                return (row, i) -> (long) row.getFloat(i);
-            }
-
-            @Override
-            public Generator visit(DoubleType doubleType) {
-                return (row, i) -> (long) row.getDouble(i);
-            }
-
-            @Override
-            public Generator visit(DateType dateType) {
-                return InternalRow::getInt;
-            }
-
-            @Override
-            public Generator visit(TimestampType timestampType) {
-                return (row, i) ->
-                        row.getTimestamp(i, 
timestampType.getPrecision()).getMillisecond();
-            }
-
-            @Override
-            public Generator visit(LocalZonedTimestampType 
localZonedTimestampType) {
-                return (row, i) ->
-                        row.getTimestamp(i, 
localZonedTimestampType.getPrecision())
-                                .getMillisecond();
-            }
-
-            @Override
-            protected Generator defaultMethod(DataType dataType) {
-                throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
-            }
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 8b97c0911..05f77e09f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -35,6 +35,7 @@ import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,6 +52,7 @@ import static 
org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
@@ -349,13 +351,15 @@ public class SchemaValidation {
                 .forEach(
                         k -> {
                             if (k.startsWith(FIELDS_PREFIX)) {
-                                String fieldName = k.split("\\.")[1];
-                                checkArgument(
-                                        DEFAULT_AGG_FUNCTION.equals(fieldName)
-                                                || 
fieldNames.contains(fieldName),
-                                        String.format(
-                                                "Field %s can not be found in 
table schema.",
-                                                fieldName));
+                                String[] fields = 
k.split("\\.")[1].split(FIELDS_SEPARATOR);
+                                for (String field : fields) {
+                                    checkArgument(
+                                            DEFAULT_AGG_FUNCTION.equals(field)
+                                                    || 
fieldNames.contains(field),
+                                            String.format(
+                                                    "Field %s can not be found 
in table schema.",
+                                                    field));
+                                }
                             }
                         });
     }
@@ -367,29 +371,42 @@ public class SchemaValidation {
             String v = entry.getValue();
             List<String> fieldNames = schema.fieldNames();
             if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
-                String sequenceFieldName =
+                String[] sequenceFieldNames =
                         k.substring(
-                                FIELDS_PREFIX.length() + 1,
-                                k.length() - SEQUENCE_GROUP.length() - 1);
-                if (!fieldNames.contains(sequenceFieldName)) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "The sequence field group: %s can not be 
found in table schema.",
-                                    sequenceFieldName));
-                }
+                                        FIELDS_PREFIX.length() + 1,
+                                        k.length() - SEQUENCE_GROUP.length() - 
1)
+                                .split(FIELDS_SEPARATOR);
 
-                for (String field : v.split(",")) {
+                for (String field : v.split(FIELDS_SEPARATOR)) {
                     if (!fieldNames.contains(field)) {
                         throw new IllegalArgumentException(
                                 String.format("Field %s can not be found in 
table schema.", field));
                     }
-                    Set<String> group = fields2Group.computeIfAbsent(field, p 
-> new HashSet<>());
-                    if (group.add(sequenceFieldName) && group.size() > 1) {
+
+                    List<String> sequenceFieldsList = new ArrayList<>();
+                    for (String sequenceFieldName : sequenceFieldNames) {
+                        if (!fieldNames.contains(sequenceFieldName)) {
+                            throw new IllegalArgumentException(
+                                    String.format(
+                                            "The sequence field group: %s can 
not be found in table schema.",
+                                            sequenceFieldName));
+                        }
+                        sequenceFieldsList.add(sequenceFieldName);
+                    }
+
+                    if (fields2Group.containsKey(field)) {
+                        List<List<String>> sequenceGroups = new ArrayList<>();
+                        sequenceGroups.add(new 
ArrayList<>(fields2Group.get(field)));
+                        sequenceGroups.add(sequenceFieldsList);
+
                         throw new IllegalArgumentException(
                                 String.format(
                                         "Field %s is defined repeatedly by 
multiple groups: %s.",
-                                        field, group));
+                                        field, sequenceGroups));
                     }
+
+                    Set<String> group = fields2Group.computeIfAbsent(field, p 
-> new HashSet<>());
+                    group.addAll(sequenceFieldsList);
                 }
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
index 35fa7a66d..ec7a00bcb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -62,8 +62,18 @@ public class UserDefinedSeqComparator implements 
FieldsComparator {
 
         List<String> fieldNames = rowType.getFieldNames();
         int[] fields = 
sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();
+
+        return create(rowType, fields);
+    }
+
+    @Nullable
+    public static UserDefinedSeqComparator create(RowType rowType, int[] 
sequenceFields) {
+        if (sequenceFields.length == 0) {
+            return null;
+        }
+
         RecordComparator comparator =
-                CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), 
fields);
-        return new UserDefinedSeqComparator(fields, comparator);
+                CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), 
sequenceFields);
+        return new UserDefinedSeqComparator(sequenceFields, comparator);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 9315fb15d..dea0268ec 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -179,7 +179,7 @@ public abstract class SortBufferWriteBufferTestBase {
         protected MergeFunction<KeyValue> createMergeFunction() {
             Options options = new Options();
             return PartialUpdateMergeFunction.factory(
-                            options, RowType.of(DataTypes.BIGINT()), 
ImmutableList.of("key"))
+                            options, RowType.of(DataTypes.BIGINT()), 
ImmutableList.of("f0"))
                     .create();
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index fa41607d1..a6e1b5f90 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -95,6 +95,49 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, null, null, 6, null, null, 6);
     }
 
+    @Test
+    public void testMultiSequenceFields() {
+        Options options = new Options();
+        options.set("fields.f3,f4.sequence-group", "f1,f2");
+        options.set("fields.f7,f8.sequence-group", "f5,f6");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunction<KeyValue> func =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"))
+                        .create();
+        func.reset();
+        // test null sequence field
+        add(func, 1, null, null, null, null, 1, 1, 1, 3);
+        add(func, 1, 2, 2, null, null, 2, 2, 1, 3);
+        validate(func, 1, null, null, null, null, 2, 2, 1, 3);
+        func.reset();
+
+        add(func, 1, 1, 1, 1, 1, 1, 1, 1, 3);
+        add(func, 1, 2, 2, 2, 2, 2, 1, 1, null);
+        validate(func, 1, 2, 2, 2, 2, 1, 1, 1, 3);
+        add(func, 1, 1, 3, 1, 3, 3, 3, 3, 2);
+        validate(func, 1, 2, 2, 2, 2, 3, 3, 3, 2);
+
+        // delete
+        add(func, RowKind.DELETE, 1, 1, 1, 3, 3, 1, 1, null, null);
+        validate(func, 1, null, null, 3, 3, 3, 3, 3, 2);
+        add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 1, 4, 4);
+        validate(func, 1, null, null, 3, 3, null, null, 4, 4);
+        add(func, 1, 4, 4, 4, 4, 5, 5, 5, 5);
+        validate(func, 1, 4, 4, 4, 4, 5, 5, 5, 5);
+        add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 1, 6, 1);
+        validate(func, 1, null, null, 6, 1, null, null, 6, 1);
+    }
+
     @Test
     public void testSequenceGroupDefaultAggFunc() {
         Options options = new Options();
@@ -123,6 +166,36 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, 4, 2, 4, 5, 3, 5);
     }
 
+    @Test
+    public void testMultiSequenceFieldsDefaultAggFunc() {
+        Options options = new Options();
+        options.set("fields.f3,f4.sequence-group", "f1,f2");
+        options.set("fields.f7,f8.sequence-group", "f5,f6");
+        options.set(FIELDS_DEFAULT_AGG_FUNC, "last_non_null_value");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunction<KeyValue> func =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"))
+                        .create();
+        func.reset();
+        add(func, 1, 1, 1, 1, 1, 1, 1, 1, 1);
+        add(func, 1, 2, 2, 2, 2, 2, 2, null, null);
+        validate(func, 1, 2, 2, 2, 2, 1, 1, 1, 1);
+        add(func, 1, 3, 3, 1, 1, 3, 3, 3, 3);
+        validate(func, 1, 2, 2, 2, 2, 3, 3, 3, 3);
+        add(func, 1, 4, null, 4, 4, 5, null, 5, 5);
+        validate(func, 1, 4, 2, 4, 4, 5, 3, 5, 5);
+    }
+
     @Test
     public void testSequenceGroupDefinedNoField() {
         Options options = new Options();
@@ -144,6 +217,27 @@ public class PartialUpdateMergeFunctionTest {
                 .hasMessageContaining("can not be found in table schema");
     }
 
+    @Test
+    public void testMultiSequenceFieldsDefinedNoField() {
+        Options options = new Options();
+        options.set("fields.f2,f3.sequence-group", "f1,f7");
+        options.set("fields.f5,f6.sequence-group", "f4");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options, rowType, 
ImmutableList.of("f0")))
+                .hasMessageContaining("can not be found in table schema");
+    }
+
     @Test
     public void testSequenceGroupRepeatDefine() {
         Options options = new Options();
@@ -163,6 +257,27 @@ public class PartialUpdateMergeFunctionTest {
                 .hasMessageContaining("is defined repeatedly by multiple 
groups");
     }
 
+    @Test
+    public void testMultiSequenceFieldsRepeatDefine() {
+        Options options = new Options();
+        options.set("fields.f3,f4.sequence-group", "f1,f2");
+        options.set("fields.f5,f6.sequence-group", "f1,f2");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options, rowType, 
ImmutableList.of("f0")))
+                .hasMessageContaining("is defined repeatedly by multiple 
groups");
+    }
+
     @Test
     public void testAdjustProjectionRepeatProject() {
         Options options = new Options();
@@ -198,6 +313,45 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 3, 3, null, 2, 4, 2);
     }
 
+    @Test
+    public void testMultiSequenceFieldsAdjustProjectionRepeatProject() {
+        Options options = new Options();
+        options.set("fields.f2,f4.sequence-group", "f1,f3");
+        options.set("fields.f5,f6.sequence-group", "f7");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        // the field 'f1' is projected twice
+        int[][] projection = new int[][] {{1}, {1}, {3}, {7}};
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
+        MergeFunctionFactory.AdjustedProjection adjustedProjection =
+                factory.adjustProjection(projection);
+
+        validate(adjustedProjection, new int[] {1, 1, 3, 7, 2, 4, 5, 6}, new 
int[] {0, 1, 2, 3});
+
+        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        func.reset();
+        add(func, 1, 1, 1, 1, 1, 1, 1, 1);
+        add(func, 2, 2, 6, 2, 2, 2, 2, 6);
+        validate(func, 2, 2, 6, 2, 2, 2, 2, 6);
+
+        // update first sequence group
+        add(func, 3, 3, null, 7, 4, null, 1, 8);
+        validate(func, 3, 3, null, 2, 4, null, 2, 6);
+
+        // update second sequence group
+        add(func, 5, 5, 3, 3, 3, 5, 5, 6);
+        validate(func, 5, 3, null, 3, 4, null, 5, 6);
+    }
+
     @Test
     public void testAdjustProjectionSequenceFieldsProject() {
         Options options = new Options();
@@ -230,6 +384,38 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, 1, 1, 2, 2);
     }
 
+    @Test
+    public void testMultiSequenceFieldsAdjustProjectionProject() {
+        Options options = new Options();
+        options.set("fields.f2,f4.sequence-group", "f1,f3");
+        options.set("fields.f5,f6.sequence-group", "f7");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        // the sequence field 'f4' is projected too
+        int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
+        MergeFunctionFactory.AdjustedProjection adjustedProjection =
+                factory.adjustProjection(projection);
+
+        validate(adjustedProjection, new int[] {1, 4, 3, 7, 2, 5, 6}, new 
int[] {0, 1, 2, 3});
+
+        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        func.reset();
+        // if sequence field is null, the related fields should not be updated
+        add(func, 1, 1, 1, 1, 1, 1, 1);
+        add(func, 1, null, 1, 3, 2, 2, 2);
+        validate(func, 1, null, 1, 3, 2, 2, 2);
+    }
+
     @Test
     public void testAdjustProjectionAllFieldsProject() {
         Options options = new Options();
@@ -265,6 +451,41 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 4, 2, 4, 2, 2, 1, 1, 1);
     }
 
+    @Test
+    public void testMultiSequenceFieldsAdjustProjectionAllFieldsProject() {
+        Options options = new Options();
+        options.set("fields.f2,f4.sequence-group", "f1,f3");
+        options.set("fields.f5,f6.sequence-group", "f7");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        // all fields are projected
+        int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6}, 
{7}};
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
+        MergeFunctionFactory.AdjustedProjection adjustedProjection =
+                factory.adjustProjection(projection);
+
+        validate(
+                adjustedProjection,
+                new int[] {0, 1, 2, 3, 4, 5, 6, 7},
+                new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+
+        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        func.reset();
+        // 'f6' has no sequence group, it should not be updated by null
+        add(func, 1, 1, 1, 1, 1, 1, 1, 1);
+        add(func, 4, 2, 4, 2, 2, 0, null, 3);
+        validate(func, 4, 2, 4, 2, 2, 1, 1, 1);
+    }
+
     @Test
     public void testAdjustProjectionNonProject() {
         Options options = new Options();
@@ -372,6 +593,33 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, 2, 3, 2);
     }
 
+    @Test
+    public void testMultiSequenceFieldsFirstValue() {
+        Options options = new Options();
+        options.set("fields.f1,f2.sequence-group", "f3,f4");
+        options.set("fields.f3.aggregate-function", "first_value");
+        options.set("fields.f4.aggregate-function", "last_value");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunction<KeyValue> func =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"))
+                        .create();
+
+        func.reset();
+
+        // f7 sequence group 2
+        add(func, 1, 1, 1, 1, 1);
+        add(func, 1, 2, 2, 2, 2);
+        validate(func, 1, 2, 2, 1, 2);
+        add(func, 1, 0, 1, 3, 3);
+        validate(func, 1, 2, 2, 3, 2);
+    }
+
     @Test
     public void testPartialUpdateWithAggregation() {
         Options options = new Options();
@@ -431,6 +679,73 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, 3, -3, null, 1, 1, 1, 3);
     }
 
+    @Test
+    public void testMultiSequenceFieldsPartialUpdateWithAggregation() {
+        Options options = new Options();
+        options.set("fields.f1,f2.sequence-group", "f3,f4,f5");
+        options.set("fields.f7,f8.sequence-group", "f6");
+        options.set("fields.f0.aggregate-function", "listagg");
+        options.set("fields.f3.aggregate-function", "sum");
+        options.set("fields.f4.aggregate-function", "first_value");
+        options.set("fields.f5.aggregate-function", "last_value");
+        options.set("fields.f6.aggregate-function", "last_non_null_value");
+        options.set("fields.f4.ignore-retract", "true");
+        options.set("fields.f6.ignore-retract", "true");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunction<KeyValue> func =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"))
+                        .create();
+
+        func.reset();
+        // f0 pk
+        // f1, f2 sequence group 1
+        // f3 in f1, f2 group with sum agg
+        // f4 in f1, f2 group with first_value agg
+        // f5 in f1, f2 group with last_value agg
+        // f6 in f7, f8 group with last_not_null agg
+        // f7, f8 sequence group 2
+
+        // test null retract
+        add(func, 1, null, null, 1, 1, 1, 1, 1, 1);
+        validate(func, 1, null, null, null, null, null, 1, 1, 1);
+
+        add(func, RowKind.DELETE, 1, null, null, 1, 1, 1, 0, 1, 1);
+        validate(func, 1, null, null, null, null, null, 1, 1, 1);
+
+        add(func, 1, 1, 1, 1, 1, 1, 1, 1, 1);
+        add(func, 1, 1, 2, 1, 2, 2, null, 2, 0);
+        validate(func, 1, 1, 2, 2, 1, 2, 1, 2, 0);
+
+        // sequence group not advanced
+        add(func, 1, 1, 1, 1, 3, 1, 1, 2, 0);
+        validate(func, 1, 1, 2, 3, 3, 2, 1, 2, 0);
+
+        // test null
+        add(func, 1, 1, 3, null, null, null, null, 4, 2);
+        validate(func, 1, 1, 3, 3, 3, null, 1, 4, 2);
+
+        // test retract
+        add(func, 1, 2, 3, 1, 1, 1, 1, 4, 3);
+        validate(func, 1, 2, 3, 4, 3, 1, 1, 4, 3);
+        add(func, RowKind.UPDATE_BEFORE, 1, 2, 3, 2, 1, 2, 1, 4, 3);
+        validate(func, 1, 2, 3, 2, 3, null, 1, 4, 3);
+        add(func, RowKind.DELETE, 1, 3, 2, 3, 1, 1, 4, 3);
+        validate(func, 1, 3, 2, -1, 3, null, 1, 4, 3);
+        // retract for old sequence
+        add(func, RowKind.DELETE, 1, 2, 2, 2, 1, 1, 1, 1, 3);
+        validate(func, 1, 3, 2, -3, 3, null, 1, 4, 3);
+    }
+
     @Test
     public void testPartialUpdateWithAggregationProjectPushDown() {
         Options options = new Options();
@@ -485,6 +800,62 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, null, -2, 2, 3);
     }
 
+    @Test
+    public void 
testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown() {
+        Options options = new Options();
+        options.set("fields.f1,f8.sequence-group", "f2,f3,f4");
+        options.set("fields.f7,f9.sequence-group", "f6");
+        options.set("fields.f0.aggregate-function", "listagg");
+        options.set("fields.f2.aggregate-function", "sum");
+        options.set("fields.f4.aggregate-function", "last_value");
+        options.set("fields.f6.aggregate-function", "last_non_null_value");
+        options.set("fields.f4.ignore-retract", "true");
+        options.set("fields.f6.ignore-retract", "true");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
+
+        MergeFunctionFactory.AdjustedProjection adjustedProjection =
+                factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+
+        validate(adjustedProjection, new int[] {3, 2, 5, 1, 8}, new int[] {0, 
1, 2});
+
+        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+
+        func.reset();
+        // f0 pk
+        // f1, f8 sequence group
+        // f2 in f1, f8 group with sum agg
+        // f3 in f1, f8 group without agg
+        // f4 in f1, f8 group with last_value agg
+        // f5 not in group
+        // f6 in f7, f9 group with last_not_null agg
+        // f7, f9 sequence group 2
+        add(func, 1, 1, 1, 1, 1);
+        add(func, 2, 1, 2, 2, 2);
+        validate(func, 2, 2, 2, 2, 2);
+
+        add(func, RowKind.INSERT, null, null, null, 3, 3);
+        validate(func, null, 2, 2, 3, 3);
+
+        // test retract
+        add(func, RowKind.UPDATE_BEFORE, 1, 2, 1, 3, 3);
+        validate(func, null, 0, 2, 3, 3);
+        add(func, RowKind.DELETE, 1, 2, 1, 3, 3);
+        validate(func, null, -2, 2, 3, 3);
+    }
+
     @Test
     public void testAggregationWithoutSequenceGroup() {
         Options options = new Options();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index e41df196b..f65a50081 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -215,6 +215,43 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         assertThat(sql("SELECT c, d FROM 
SG")).containsExactlyInAnyOrder(Row.of(5, null));
     }
 
+    @Test
+    public void testMultiFieldsSequenceGroup() {
+        sql(
+                "CREATE TABLE SG ("
+                        + "k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 
INT, g_3 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                        + " WITH ("
+                        + "'merge-engine'='partial-update', "
+                        + "'fields.g_1.sequence-group'='a,b', "
+                        + "'fields.g_2,g_3.sequence-group'='c,d');");
+
+        sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1, 1)");
+
+        // g_2, g_3 should not be updated
+        sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, 1, CAST(NULL AS INT))");
+
+        // select *
+        assertThat(sql("SELECT * FROM SG"))
+                .containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 1, 1, 1, 1));
+
+        // projection
+        assertThat(sql("SELECT c, d FROM 
SG")).containsExactlyInAnyOrder(Row.of(1, 1));
+
+        // g_1 should not be updated
+        sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3, 1)");
+
+        assertThat(sql("SELECT * FROM SG"))
+                .containsExactlyInAnyOrder(Row.of(1, 2, 2, 2, 3, 3, 3, 1));
+
+        // d should be updated by null
+        sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT), 1)");
+        sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT), 1)");
+        sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4, 1)");
+
+        assertThat(sql("SELECT a, b FROM 
SG")).containsExactlyInAnyOrder(Row.of(4, 4));
+        assertThat(sql("SELECT c, d FROM 
SG")).containsExactlyInAnyOrder(Row.of(5, null));
+    }
+
     @Test
     public void testSequenceGroupWithDefaultAggFunc() {
         sql(
@@ -285,7 +322,19 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
                                                 + 
"'fields.g_1.sequence-group'='a,b', "
                                                 + 
"'fields.g_2.sequence-group'='a,d');"))
                 .hasRootCauseMessage(
-                        "Field a is defined repeatedly by multiple groups: 
[g_1, g_2].");
+                        "Field a is defined repeatedly by multiple groups: 
[[g_1], [g_2]].");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE SG ("
+                                                + "k INT, a INT, b INT, g_1 
INT, c INT, d INT, g_2 INT, g_3 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                                                + " WITH ("
+                                                + 
"'merge-engine'='partial-update', "
+                                                + 
"'fields.g_1.sequence-group'='a,b', "
+                                                + 
"'fields.g_2,g_3.sequence-group'='a,d');"))
+                .hasRootCauseMessage(
+                        "Field a is defined repeatedly by multiple groups: 
[[g_1], [g_2, g_3]].");
     }
 
     @Test
@@ -355,6 +404,47 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         assertThat(sql("SELECT a, b, c FROM 
AGG")).containsExactlyInAnyOrder(Row.of(6, 3, null));
     }
 
+    @Test
+    public void testMultiFieldsSequencePartialUpdateWithAggregation() {
+        sql(
+                "CREATE TABLE AGG ("
+                        + "k INT, a INT, b INT, g_1 INT, c VARCHAR, g_2 INT, 
g_3 INT, PRIMARY KEY (k) NOT ENFORCED)"
+                        + " WITH ("
+                        + "'merge-engine'='partial-update', "
+                        + "'fields.a.aggregate-function'='sum', "
+                        + "'fields.g_1,g_3.sequence-group'='a', "
+                        + "'fields.g_2.sequence-group'='c');");
+        // a in group g_1, g_3 with sum agg
+        // b not in group
+        // c in group g_2 without agg
+
+        sql("INSERT INTO AGG VALUES (1, 1, 1, 1, '1', 1, 1)");
+
+        // g_2 should not be updated
+        sql("INSERT INTO AGG VALUES (1, 2, 2, 2, '2', CAST(NULL AS INT), 2)");
+
+        // select *
+        assertThat(sql("SELECT * FROM AGG"))
+                .containsExactlyInAnyOrder(Row.of(1, 3, 2, 2, "1", 1, 2));
+
+        // projection
+        assertThat(sql("SELECT a, c FROM 
AGG")).containsExactlyInAnyOrder(Row.of(3, "1"));
+
+        // g_1 should not be updated
+        sql("INSERT INTO AGG VALUES (1, 3, 3, 2, '3', 3, 1)");
+
+        assertThat(sql("SELECT * FROM AGG"))
+                .containsExactlyInAnyOrder(Row.of(1, 6, 3, 2, "3", 3, 2));
+
+        sql(
+                "INSERT INTO AGG VALUES (1, CAST(NULL AS INT), CAST(NULL AS 
INT), 2, CAST(NULL AS VARCHAR), 4, 2)");
+
+        // a keep the last accumulator
+        // b is not updated to null
+        // c updated to null
+        assertThat(sql("SELECT a, b, c FROM 
AGG")).containsExactlyInAnyOrder(Row.of(6, 3, null));
+    }
+
     @Test
     public void testPartialUpdateWithDefaultAndFieldAggregation() {
         sql(

Reply via email to