This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 65846f8ee6 [core] Add validation to prevent primary key in 
sequence-group (#7052) (#7656)
65846f8ee6 is described below

commit 65846f8ee6bfcb5eaeb0653469ed768a841af879
Author: Rhett CfZhuang <[email protected]>
AuthorDate: Thu Jun 4 12:18:54 2026 +0800

    [core] Add validation to prevent primary key in sequence-group (#7052) 
(#7656)
    
    When a primary key field is configured in a sequence-group of
    partial-update merge engine, it causes Parquet decoding failures during
    compaction because the key field may be set to null. This commit adds
    early validation at configuration parsing time to reject such invalid
    configurations with a clear error message.
---
 .../compact/PartialUpdateMergeFunction.java        | 19 ++++++-
 .../compact/PartialUpdateMergeFunctionTest.java    | 63 ++++++++++++++++++++++
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 28 +++++-----
 .../apache/paimon/flink/PartialUpdateITCase.java   |  2 +-
 4 files changed, 95 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 5d04440c99..6cc551baec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -65,6 +65,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
 
     public static final String SEQUENCE_GROUP = "sequence-group";
+    private static final String SEQUENCE_GROUP_PK_ERROR =
+            "The sequence-group '%s' contains primary key field '%s', "
+                    + "which is not allowed. Primary key columns cannot be put 
in sequence-group.";
 
     private final InternalRow.FieldGetter[] getters;
     private final boolean ignoreDelete;
@@ -428,6 +431,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                             .map(fieldName -> requireField(fieldName, 
fieldNames))
                             .forEach(
                                     field -> {
+                                        String protectedFieldName = 
fieldNames.get(field);
+                                        if 
(primaryKeys.contains(protectedFieldName)) {
+                                            throw new IllegalArgumentException(
+                                                    String.format(
+                                                            
SEQUENCE_GROUP_PK_ERROR,
+                                                            k,
+                                                            
protectedFieldName));
+                                        }
                                         if 
(fieldSeqComparators.containsKey(field)) {
                                             throw new IllegalArgumentException(
                                                     String.format(
@@ -435,13 +446,17 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                                             
fieldNames.get(field), k));
                                         }
                                         fieldSeqComparators.put(field, 
userDefinedSeqComparator);
-                                        
fieldsProtectedBySequenceGroup.add(fieldNames.get(field));
+                                        
fieldsProtectedBySequenceGroup.add(protectedFieldName);
                                     });
 
                     // add self
                     for (int index : sequenceFields) {
-                        allSequenceFields.add(fieldNames.get(index));
                         String fieldName = fieldNames.get(index);
+                        if (primaryKeys.contains(fieldName)) {
+                            throw new IllegalArgumentException(
+                                    String.format(SEQUENCE_GROUP_PK_ERROR, k, 
fieldName));
+                        }
+                        allSequenceFields.add(fieldName);
                         fieldSeqComparators.put(index, 
userDefinedSeqComparator);
                         sequenceGroupMap.put(fieldName, index);
                     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index f4f2d28f75..050f4b855b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -887,6 +887,69 @@ public class PartialUpdateMergeFunctionTest {
                                 "Must use sequence group for aggregation 
functions but not found for field f1."));
     }
 
+    @Test
+    public void testSequenceGroupCannotContainPrimaryKey() {
+        // Issue #7052: Putting a primary key column in sequence-group should 
be forbidden
+        // as it causes Parquet decoding failures during compaction
+        Options options = new Options();
+        options.set("fields.f0.sequence-group", "f1,f2");
+        RowType rowType =
+                RowType.of(DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT());
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options, rowType, 
ImmutableList.of("f0")))
+                .hasMessageContaining(
+                        "The sequence-group 'fields.f0.sequence-group' 
contains primary key field 'f0', "
+                                + "which is not allowed. Primary key columns 
cannot be put in sequence-group.");
+    }
+
+    @Test
+    public void testMultiSequenceFieldsCannotContainPrimaryKey() {
+        // Issue #7052: Multi-field sequence-group also cannot contain primary 
key columns
+        // The sequence fields (f2,f3) are the "self" part, they must not 
contain PKs
+        Options options = new Options();
+        options.set("fields.f2,f3.sequence-group", "f0,f4");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options, rowType, 
ImmutableList.of("f2")))
+                .hasMessageContaining(
+                        "The sequence-group 'fields.f2,f3.sequence-group' 
contains primary key field 'f2', "
+                                + "which is not allowed. Primary key columns 
cannot be put in sequence-group.");
+    }
+
+    @Test
+    public void testPrimaryKeyCannotBeInSequenceGroupValue() {
+        // Issue #7052: A primary key column appearing in the value part of 
sequence-group
+        // is forbidden — f2 is the PK and appears in the sequence-group's 
value list
+        Options options = new Options();
+        options.set("fields.f4.sequence-group", "f1,f2");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        assertThatThrownBy(
+                        () ->
+                                PartialUpdateMergeFunction.factory(
+                                        options, rowType, 
ImmutableList.of("f2")))
+                .hasMessageContaining(
+                        "The sequence-group 'fields.f4.sequence-group' 
contains primary key field 'f2', "
+                                + "which is not allowed. Primary key columns 
cannot be put in sequence-group.");
+    }
+
     @Test
     public void testDeleteReproduceCorrectSequenceNumber() {
         Options options = new Options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 82b097d894..ebc88e7a8e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1786,7 +1786,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                 createFileStoreTable(
                         options -> {
                             options.set("merge-engine", "partial-update");
-                            options.set("fields.a.sequence-group", "c");
+                            options.set("fields.b.sequence-group", "c");
                             options.set("fields.c.aggregate-function", "sum");
                         },
                         rowType);
@@ -1795,32 +1795,32 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite("");
         StreamTableCommit commit = table.newCommit("");
-        // 1. inserts
+        // 1. inserts (b=3 is the sequence field, all rows have same b=3 so 
all accepted)
         write.write(GenericRow.of(1, 1, 3, 3));
-        write.write(GenericRow.of(1, 1, 1, 1));
-        write.write(GenericRow.of(1, 1, 2, 2));
+        write.write(GenericRow.of(1, 1, 3, 1));
+        write.write(GenericRow.of(1, 1, 3, 2));
         commit.commit(0, write.prepareCommit(true, 0));
         List<String> result =
                 getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
-        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 3, 6]");
 
-        // 2. Update Before
-        write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+        // 2. Update Before (b=3, same sequence)
+        write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 3, 2));
         commit.commit(1, write.prepareCommit(true, 1));
         result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
-        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 3, 4]");
 
-        // 3. Update After
-        write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+        // 3. Update After (b=3, same sequence)
+        write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 3, 3));
         commit.commit(2, write.prepareCommit(true, 2));
         result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
-        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 3, 7]");
 
-        // 4. Retracts
-        write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+        // 4. Retracts (b=3, same sequence)
+        write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 3, 3));
         commit.commit(3, write.prepareCommit(true, 3));
         result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
-        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 3, 4]");
         write.close();
         commit.close();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 01e0b6c11b..c1fcc2ca09 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -337,7 +337,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                 "CREATE TABLE IF NOT EXISTS T_P ("
                         + "j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY 
(j,k) NOT ENFORCED)"
                         + " WITH ('merge-engine'='partial-update', 
'changelog-producer' = 'lookup', "
-                        + "'fields.a.sequence-group'='j', 
'fields.b.sequence-group'='c');");
+                        + "'fields.a.sequence-group'='b,c');");
         batchSql("INSERT INTO T_P VALUES (1, 1, 1, 1, '1')");
         assertThat(sql("SELECT k, c FROM 
T_P")).containsExactlyInAnyOrder(Row.of(1, "1"));
     }

Reply via email to