This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 68cf3bca10 [core] Support snapshot-based sequence ordering for 
primary-key tables (#7832)
68cf3bca10 is described below

commit 68cf3bca10f2891cee2c1c9769669e0d1cd3765a
Author: Junrui Lee <[email protected]>
AuthorDate: Thu Jun 4 16:29:32 2026 +0800

    [core] Support snapshot-based sequence ordering for primary-key tables 
(#7832)
---
 docs/docs/primary-key-table/sequence-rowkind.mdx   |  47 ++
 docs/generated/core_configuration.html             |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  22 +
 .../src/main/java/org/apache/paimon/KeyValue.java  |   5 +
 .../java/org/apache/paimon/io/DataFileMeta.java    |   7 +
 .../paimon/io/KeyValueDataFileRecordReader.java    |  32 +-
 .../paimon/io/KeyValueFileReaderFactory.java       |  25 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  31 ++
 .../org/apache/paimon/schema/SchemaValidation.java |  42 ++
 .../paimon/table/AbstractFileStoreTable.java       |   2 +-
 .../compact/SortMergeSnapshotOrderingTest.java     | 134 +++++
 .../apache/paimon/schema/SchemaValidationTest.java |  49 ++
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 569 +++++++++++++++++++++
 13 files changed, 965 insertions(+), 6 deletions(-)

diff --git a/docs/docs/primary-key-table/sequence-rowkind.mdx 
b/docs/docs/primary-key-table/sequence-rowkind.mdx
index 96fcc19963..8a0f49fb73 100644
--- a/docs/docs/primary-key-table/sequence-rowkind.mdx
+++ b/docs/docs/primary-key-table/sequence-rowkind.mdx
@@ -71,3 +71,50 @@ By default, the primary key table determines the row kind 
according to the input
 `'rowkind.field'` to use a field to extract row kind.
 
 The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.
+
+## Snapshot Ordering
+
+In multi-writer scenarios where wall-clock sequence numbers cannot be globally 
ordered across writers,
+you can enable `'sequence.snapshot-ordering'` to use the commit snapshot id as 
the ordering key when
+merging records with the same primary key. Records from later snapshots are 
considered newer,
+regardless of their per-record sequence number.
+
+<Tabs>
+<TabItem value="flink" label="Flink">
+
+```sql
+CREATE TABLE my_table (
+    pk BIGINT PRIMARY KEY NOT ENFORCED,
+    v1 DOUBLE,
+    v2 BIGINT
+) WITH (
+    'sequence.snapshot-ordering' = 'true',
+    'write-only' = 'true'
+);
+```
+
+</TabItem>
+</Tabs>
+
+:::warning
+This option requires `'write-only' = 'true'`. Compaction must be performed by 
a separate dedicated
+compact job. This ensures that compaction correctly preserves the snapshot id 
of each record.
+:::
+
+:::info
+`'sequence.snapshot-ordering'` is mutually exclusive with `'sequence.field'`. 
You cannot enable both
+on the same table.
+:::
+
+:::info
+The ordering key is the commit snapshot id only; the order of records **within 
the same snapshot** is
+not guaranteed, and this is by design. Under the default configuration it is 
harmless: a writer
+buffers a commit's writes (`'write-buffer-spillable' = 'true'`) and runs them 
through the merge
+function before flushing, so at most one record per primary key is written per 
snapshot — the common
+case is fully covered. We therefore deliberately do not handle the case where 
the same key is spread
+across multiple files of one snapshot. That case only arises with 
`'write-buffer-spillable' = 'false'`,
+or when the spilled data exceeds `'write-buffer-spill-disk-size'`, where the 
buffer may be flushed
+mid-commit; the same key can then land in multiple files of the same snapshot 
with equal sequence
+numbers and their relative order becomes undefined. This affects only 
intra-snapshot order, never the
+cross-snapshot ordering this feature provides.
+:::
diff --git a/docs/generated/core_configuration.html 
b/docs/generated/core_configuration.html
index 1a66fb0ab0..9939b28b78 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -1361,6 +1361,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td><p>Enum</p></td>
             <td>Specify the order of sequence.field.<br /><br />Possible 
values:<ul><li>"ascending": specifies sequence.field sort order is 
ascending.</li><li>"descending": specifies sequence.field sort order is 
descending.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>sequence.snapshot-ordering</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When enabled, merge uses the commit snapshot id as the 
ordering key for primary-key conflicts: records from later snapshots always 
win. Designed for multi-writer scenarios on the same primary-key table where 
wall-clock sequence numbers cannot be globally ordered. The order of records 
within the same snapshot is not guaranteed. Mutually exclusive with 
sequence.field. Requires a primary-key table with write-only=true. Inline 
compaction is not allowed because snapshot ids ar [...]
+        </tr>
         <tr>
             <td><h5>sink.process-time-zone</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index da734fa938..ea325667aa 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1020,6 +1020,24 @@ public class CoreOptions implements Serializable {
                     .defaultValue(SortOrder.ASCENDING)
                     .withDescription("Specify the order of sequence.field.");
 
+    @Immutable
+    public static final ConfigOption<Boolean> SEQUENCE_SNAPSHOT_ORDERING =
+            key("sequence.snapshot-ordering")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When enabled, merge uses the commit snapshot id 
as the ordering key "
+                                    + "for primary-key conflicts: records from 
later snapshots "
+                                    + "always win. Designed for multi-writer 
scenarios on the same "
+                                    + "primary-key table where wall-clock 
sequence numbers cannot "
+                                    + "be globally ordered. The order of 
records within the same "
+                                    + "snapshot is not guaranteed. Mutually 
exclusive with "
+                                    + "sequence.field. Requires a primary-key 
table with "
+                                    + "write-only=true. Inline compaction is 
not allowed because "
+                                    + "snapshot ids are assigned only after 
commit. To compact such "
+                                    + "tables, run a dedicated compaction 
job/action with "
+                                    + "write-only=false.");
+
     @Immutable
     public static final ConfigOption<Boolean> 
AGGREGATION_REMOVE_RECORD_ON_DELETE =
             key("aggregation.remove-record-on-delete")
@@ -3492,6 +3510,10 @@ public class CoreOptions implements Serializable {
         return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
     }
 
+    public boolean snapshotSequenceOrdering() {
+        return options.get(SEQUENCE_SNAPSHOT_ORDERING);
+    }
+
     public Optional<String> rowkindField() {
         return options.getOptional(ROWKIND_FIELD);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index c314f21107..4c4ff61c24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -89,6 +89,11 @@ public class KeyValue {
         return sequenceNumber;
     }
 
+    public KeyValue setSequenceNumber(long sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+        return this;
+    }
+
     public RowKind valueKind() {
         return valueKind;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 23b34947bc..a8cdc031e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -277,8 +277,15 @@ public interface DataFileMeta {
 
     SimpleStats valueStats();
 
+    /**
+     * Minimum sequence number of records in this file. When {@code 
sequence.snapshot-ordering} is
+     * enabled for a primary-key table, this field is repurposed to carry the 
commit snapshot id
+     * instead of the per-record sequence number range (the snapshot id is 
stamped into it at commit
+     * time by {@code FileStoreCommitImpl}).
+     */
     long minSequenceNumber();
 
+    /** @see #minSequenceNumber() */
     long maxSequenceNumber();
 
     long schemaId();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 6cf0876970..2538c08a21 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -36,12 +36,26 @@ public class KeyValueDataFileRecordReader implements 
FileRecordReader<KeyValue>
     private final FileRecordReader<InternalRow> reader;
     private final KeyValueSerializer serializer;
     private final int level;
+    private final boolean overrideSequenceWithSnapshotId;
+    private final long snapshotId;
 
     public KeyValueDataFileRecordReader(
             FileRecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
+        this(reader, keyType, valueType, level, false, 
KeyValue.UNKNOWN_SEQUENCE);
+    }
+
+    public KeyValueDataFileRecordReader(
+            FileRecordReader<InternalRow> reader,
+            RowType keyType,
+            RowType valueType,
+            int level,
+            boolean overrideSequenceWithSnapshotId,
+            long snapshotId) {
         this.reader = reader;
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
+        this.overrideSequenceWithSnapshotId = overrideSequenceWithSnapshotId;
+        this.snapshotId = snapshotId;
     }
 
     @Nullable
@@ -53,10 +67,20 @@ public class KeyValueDataFileRecordReader implements 
FileRecordReader<KeyValue>
         }
 
         return iterator.transform(
-                internalRow ->
-                        internalRow == null
-                                ? null
-                                : 
serializer.fromRow(internalRow).setLevel(level));
+                internalRow -> {
+                    if (internalRow == null) {
+                        return null;
+                    }
+                    KeyValue kv = 
serializer.fromRow(internalRow).setLevel(level);
+                    // In snapshot-ordering mode, an APPEND file's on-disk 
per-record sequence
+                    // numbers are stale; we override them with the commit 
snapshot id so later
+                    // snapshots win during merge. Any read path bypassing 
this override would
+                    // order APPEND records incorrectly.
+                    if (overrideSequenceWithSnapshotId) {
+                        kv.setSequenceNumber(snapshotId);
+                    }
+                    return kv;
+                });
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index fc505e19c2..5f7e374192 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.OrcFormatReaderContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.FileRecordReader;
@@ -42,6 +43,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.AsyncRecordReader;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FormatReaderMapping;
+import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -68,6 +70,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
     private final long asyncThreshold;
     private final boolean ignoreCorruptFiles;
     private final boolean ignoreLostFiles;
+    private final boolean snapshotSequenceOrdering;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final BinaryRow partition;
     private final DeletionVector.Factory dvFactory;
@@ -93,6 +96,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         this.asyncThreshold = 
coreOptions.fileReaderAsyncThreshold().getBytes();
         this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
         this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
+        this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering();
         this.partition = partition;
         this.formatReaderMappings = new HashMap<>();
         this.dvFactory = dvFactory;
@@ -168,7 +172,26 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     new ApplyDeletionVectorReader(fileRecordReader, 
deletionVector.get());
         }
 
-        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, file.level());
+        // In snapshot-ordering mode, APPEND files carry the commit snapshot 
id in
+        // minSequenceNumber (stamped at commit time); override per-record 
sequence with it so
+        // later snapshots win during merge. COMPACT files already carry the 
snapshot id in their
+        // per-record _SEQUENCE_NUMBER and are left untouched.
+        boolean overrideSequenceWithSnapshotId = false;
+        if (snapshotSequenceOrdering) {
+            Preconditions.checkState(
+                    file.fileSource().isPresent(),
+                    "sequence.snapshot-ordering requires data files with 
fileSource metadata. "
+                            + "This option is only safe for newly-created 
tables or empty tables. "
+                            + "Legacy files without fileSource cannot be 
ordered by commit snapshot id.");
+            overrideSequenceWithSnapshotId = file.fileSource().get() == 
FileSource.APPEND;
+        }
+        return new KeyValueDataFileRecordReader(
+                fileRecordReader,
+                keyType,
+                valueType,
+                file.level(),
+                overrideSequenceWithSnapshotId,
+                file.minSequenceNumber());
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 10c9b20a04..ddab7a0419 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -985,6 +985,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 deltaFiles = assigned.assignedEntries;
             }
 
+            if (options.snapshotSequenceOrdering()) {
+                deltaFiles = stampSequenceWithSnapshotId(newSnapshotId, 
commitKind, deltaFiles);
+            }
+
             // the added records subtract the deleted records from
             long deltaRecordCount = recordCountAdd(deltaFiles) - 
recordCountDelete(deltaFiles);
             long totalRecordCount = previousTotalRecordCount + 
deltaRecordCount;
@@ -1261,4 +1265,31 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         IOUtils.closeAllQuietly(commitCallbacks);
         IOUtils.closeQuietly(snapshotCommit);
     }
+
+    /**
+     * Stamps the commit snapshot id into {@link 
DataFileMeta#minSequenceNumber()} / {@link
+     * DataFileMeta#maxSequenceNumber()} of APPEND files, reusing these fields 
instead of adding a
+     * new one (same pattern as {@link 
RowTrackingCommitUtils#assignRowTracking}). COMPACT files are
+     * returned unchanged: their input was read through the override path, so 
their per-record
+     * {@code _SEQUENCE_NUMBER} already carries the snapshot id.
+     *
+     * <p>All records of a snapshot share one id, so intra-snapshot order is 
not preserved. This is
+     * accepted: the default spillable writer collapses a commit's writes 
through the merge function
+     * to one record per key before flush, and the feature targets 
cross-snapshot ordering only.
+     */
+    private static List<ManifestEntry> stampSequenceWithSnapshotId(
+            long snapshotId, CommitKind commitKind, List<ManifestEntry> files) 
{
+        if (commitKind == CommitKind.COMPACT) {
+            return files;
+        }
+        List<ManifestEntry> result = new ArrayList<>(files.size());
+        for (ManifestEntry entry : files) {
+            if (entry.kind() == FileKind.ADD) {
+                result.add(entry.assignSequenceNumber(snapshotId, snapshotId));
+            } else {
+                result.add(entry);
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 50228385a9..c600927af0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -105,6 +105,18 @@ public class SchemaValidation {
      * @param schema the schema to be validated
      */
     public static void validateTableSchema(TableSchema schema) {
+        validateTableSchema(schema, Collections.emptySet());
+    }
+
+    /**
+     * Validate the {@link TableSchema} and {@link CoreOptions}.
+     *
+     * @param schema the schema to be validated
+     * @param dynamicOptionKeys option keys that are overridden dynamically at 
runtime (e.g. by
+     *     dedicated compaction jobs) and should therefore be excluded from 
certain static
+     *     validations such as the {@code write-only} requirement for snapshot 
ordering
+     */
+    public static void validateTableSchema(TableSchema schema, Set<String> 
dynamicOptionKeys) {
         CoreOptions options = new CoreOptions(schema.options());
 
         validateOnlyContainPrimitiveType(schema.fields(), 
schema.primaryKeys(), "primary key");
@@ -288,6 +300,10 @@ public class SchemaValidation {
                     "deletion-vectors.merge-on-read requires 
deletion-vectors.enabled to be true.");
         }
 
+        if (options.snapshotSequenceOrdering()) {
+            validateSnapshotSequenceOrdering(schema, options, 
dynamicOptionKeys);
+        }
+
         // vector field names must point to vector type
         Set<String> fieldNamesSpecifiedAsVector = options.vectorField();
         schema.fields()
@@ -614,6 +630,32 @@ public class SchemaValidation {
         }
     }
 
+    private static void validateSnapshotSequenceOrdering(
+            TableSchema schema, CoreOptions options, Set<String> 
dynamicOptionKeys) {
+        checkArgument(
+                !schema.primaryKeys().isEmpty(),
+                "%s = true requires a primary-key table; append-only tables 
cannot use "
+                        + "snapshot-based sequence ordering.",
+                CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key());
+        checkArgument(
+                options.sequenceField().isEmpty(),
+                "%s = true is mutually exclusive with %s; the snapshot id is 
the sole tiebreaker.",
+                CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+                CoreOptions.SEQUENCE_FIELD.key());
+        // Skip writeOnly check when write-only is dynamically overridden 
(e.g. by dedicated
+        // compact jobs that override write-only=false at runtime).
+        if (!dynamicOptionKeys.contains(CoreOptions.WRITE_ONLY.key())) {
+            checkArgument(
+                    options.writeOnly(),
+                    "%s = true requires %s = true. Snapshot ordering relies on 
snapshot id to "
+                            + "determine record order, but inline compaction 
happens before "
+                            + "snapshot creation — files have not been stamped 
with the correct "
+                            + "snapshot id yet. Use dedicated compaction job 
instead.",
+                    CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+                    CoreOptions.WRITE_ONLY.key());
+        }
+    }
+
     private static void validateForDeletionVectors(CoreOptions options) {
         checkArgument(
                 options.changelogProducer() == ChangelogProducer.NONE
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 23214ff05f..0249a8e6b3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -374,7 +374,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
 
         // validate schema with new options
-        SchemaValidation.validateTableSchema(newTableSchema);
+        SchemaValidation.validateTableSchema(newTableSchema, 
dynamicOptions.keySet());
 
         return copy(newTableSchema);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
new file mode 100644
index 0000000000..0e29efd171
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for snapshot-ordering in sort-merge readers. With {@code 
sequence.snapshot-ordering}, the
+ * commit snapshot id is carried in each record's {@code sequenceNumber} 
(stamped at read time for
+ * APPEND files), so the sort-merge readers need no snapshot-specific branch: 
comparing by {@code
+ * sequenceNumber} already makes records from later snapshots win.
+ */
+public class SortMergeSnapshotOrderingTest {
+
+    private static final Comparator<org.apache.paimon.data.InternalRow> 
KEY_COMPARATOR =
+            (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+    private static final RowType VALUE_TYPE = RowType.of(DataTypes.INT());
+
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testLaterSnapshotWins(SortEngine sortEngine) throws 
IOException {
+        // seq carries the snapshot id: snapshot 6 wins over snapshot 5.
+        KeyValue winner = merge(sortEngine, kv(1, 5, 999), kv(1, 6, 1));
+        assertThat(winner.value().getInt(0)).isEqualTo(1);
+        assertThat(winner.sequenceNumber()).isEqualTo(6);
+    }
+
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testHigherSequenceWins(SortEngine sortEngine) throws 
IOException {
+        KeyValue winner = merge(sortEngine, kv(1, 100, 100), kv(1, 50, 50));
+        assertThat(winner.value().getInt(0)).isEqualTo(100);
+    }
+
+    private static KeyValue kv(int key, long seq, int value) {
+        return new KeyValue()
+                .replace(GenericRow.of(key), seq, RowKind.INSERT, 
GenericRow.of(value));
+    }
+
+    private static KeyValue merge(SortEngine sortEngine, KeyValue... kvs) 
throws IOException {
+        List<RecordReader<KeyValue>> readers = new ArrayList<>();
+        for (KeyValue kv : kvs) {
+            readers.add(new SingleKvReader(kv));
+        }
+
+        MergeFunctionWrapper<KeyValue> wrapper =
+                new 
ReducerMergeFunctionWrapper(DeduplicateMergeFunction.factory().create());
+
+        RecordReader<KeyValue> reader =
+                SortMergeReader.createSortMergeReader(
+                        readers, KEY_COMPARATOR, null, wrapper, sortEngine);
+
+        RecordReader.RecordIterator<KeyValue> batch = reader.readBatch();
+        assertThat(batch).isNotNull();
+        KeyValue result = batch.next();
+        assertThat(result).isNotNull();
+        assertThat(batch.next()).isNull();
+        batch.releaseBatch();
+        reader.close();
+        return result;
+    }
+
+    private static class SingleKvReader implements RecordReader<KeyValue> {
+        private KeyValue kv;
+
+        SingleKvReader(KeyValue kv) {
+            this.kv = kv;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<KeyValue> readBatch() {
+            if (kv == null) {
+                return null;
+            }
+            KeyValue toReturn = kv;
+            kv = null;
+            return new RecordIterator<KeyValue>() {
+                private boolean returned = false;
+
+                @Nullable
+                @Override
+                public KeyValue next() {
+                    if (returned) {
+                        return null;
+                    }
+                    returned = true;
+                    return toReturn;
+                }
+
+                @Override
+                public void releaseBatch() {}
+            };
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index beb4bfd376..6bda437692 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -470,6 +470,55 @@ class SchemaValidationTest {
                 new TableSchema(1, fields, 10, emptyList(), 
singletonList("f1"), options, ""));
     }
 
+    @Test
+    public void testSnapshotSequenceOrderingHappyPath() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        assertThatNoException().isThrownBy(() -> 
validateTableSchemaExec(options));
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingRejectsNonWriteOnly() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+        assertThatThrownBy(() -> validateTableSchemaExec(options))
+                .hasMessageContaining(CoreOptions.WRITE_ONLY.key());
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingRejectsSequenceField() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+        options.put(CoreOptions.WRITE_ONLY.key(), "true");
+        options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+        assertThatThrownBy(() -> validateTableSchemaExec(options))
+                .hasMessageContaining("sequence.field");
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingRejectsNonPkTable() {
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", DataTypes.INT()),
+                        new DataField(1, "f1", DataTypes.INT()));
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+        options.put(BUCKET.key(), String.valueOf(-1));
+        assertThatThrownBy(
+                        () ->
+                                validateTableSchema(
+                                        new TableSchema(
+                                                1,
+                                                fields,
+                                                10,
+                                                emptyList(),
+                                                emptyList(),
+                                                options,
+                                                "")))
+                .hasMessageContaining("primary-key");
+    }
+
     @Test
     public void testFileFormatPerLevelRejectsIncompatibleSchema() {
         List<DataField> fields =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index ebc88e7a8e..4628710b9e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -2688,4 +2688,573 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
                 .satisfies(anyCauseMatches(IllegalArgumentException.class, 
"append-only tables"));
     }
+
+    @Test
+    public void testSnapshotSequenceOrdering() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1: write pk=(1,10) many times so that the per-record 
sequence number is high.
+        for (int i = 0; i < 100; i++) {
+            write.write(rowData(1, 10, 999L));
+        }
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: write pk=(1,10) once with a lower value. Because the 
snapshot id (2)
+        // is larger than snapshot 1, this record should win even though its 
per-record sequence
+        // number is much lower.
+        write.write(rowData(1, 10, 1L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactly("1|10|1");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingWithMinHeap() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(CoreOptions.SORT_ENGINE, 
CoreOptions.SortEngine.MIN_HEAP);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        for (int i = 0; i < 100; i++) {
+            write.write(rowData(1, 10, 999L));
+        }
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(rowData(1, 10, 1L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactly("1|10|1");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void 
testSnapshotSequenceOrderingFallsBackToSequenceWithinSnapshot() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Within a single snapshot, sequence number is the tiebreaker. The 
later write (999)
+        // gets a higher sequence number and should win.
+        write.write(rowData(1, 10, 1L));
+        write.write(rowData(1, 10, 999L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactly("1|10|999");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void 
testSnapshotSequenceOrderingCompactionPreservesInputSnapshotId() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1: write pk=(1,10) with val=100
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: write pk=(1,10) with val=200 (this should win after 
compaction)
+        write.write(rowData(1, 10, 200L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // Snapshot 3: write a DIFFERENT key pk=(1,20)
+        write.write(rowData(1, 20, 300L));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        // Snapshot 4: compact using dedicated compact writer (simulates 
compact job)
+        write.close();
+        commit.close();
+        FileStoreTable compactTable =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+        StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+        compactWrite.close();
+        compactCommit.close();
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        for (DataSplit split : splits) {
+            for (DataFileMeta file : split.dataFiles()) {
+                // The compacted file's minSequenceNumber should reflect the 
min snapshot id
+                // of records inside (from per-record _SEQUENCE_NUMBER values 
written during
+                // compaction), NOT the compaction commit's snapshot id (4).
+                assertThat(file.minSequenceNumber())
+                        .as(
+                                "Compacted file %s should have 
minSequenceNumber from per-record "
+                                        + "snapshot ids, not the compaction 
commit's snapshot id",
+                                file.fileName())
+                        .isLessThanOrEqualTo(3);
+            }
+        }
+
+        // Also verify the read result is correct
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, toSplits(splits), toString);
+        assertThat(result).containsExactlyInAnyOrder("1|10|200", "1|20|300");
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingCompactionNoOrderingReversal() 
throws Exception {
+        // Reproduces the scenario from the PR review: compaction of files from
+        // snapshot 1 and 3 must NOT cause records from snapshot 1 to win over
+        // an uncompacted file from snapshot 2.
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(CoreOptions.BUCKET, 1);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1: write pk=(1,10) with val=100
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: write SAME key pk=(1,10) with val=200 — this should win
+        write.write(rowData(1, 10, 200L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // Snapshot 3: write DIFFERENT key pk=(1,20) with val=300
+        write.write(rowData(1, 20, 300L));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        // Compact all files using dedicated compact writer
+        write.close();
+        commit.close();
+        FileStoreTable compactTable =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+        StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+        compactWrite.close();
+        compactCommit.close();
+
+        // Write pk=(1,10) again with val=999 — snapshot 5 should definitely 
win
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(rowData(1, 10, 999L));
+        commit.commit(4, write.prepareCommit(false, 4));
+
+        write.close();
+        commit.close();
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        // pk=(1,10): snapshot 5 (val=999) wins over snapshot 2 (val=200) and 
snapshot 1 (val=100)
+        // pk=(1,20): snapshot 3 (val=300) is the only version
+        assertThat(result).containsExactlyInAnyOrder("1|10|999", "1|20|300");
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingMultiRoundCompaction() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(CoreOptions.BUCKET, 1);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1: pk=(1,10) val=100
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: pk=(1,10) val=200 — should win over snapshot 1
+        write.write(rowData(1, 10, 200L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // Snapshot 3: pk=(1,20) val=300
+        write.write(rowData(1, 20, 300L));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        // First compaction (snapshot 4) using dedicated compact writer
+        write.close();
+        commit.close();
+        FileStoreTable compactTable =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+        StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+        compactWrite.close();
+        compactCommit.close();
+
+        // Snapshot 5: pk=(1,10) val=500 — should win over everything
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(rowData(1, 10, 500L));
+        commit.commit(4, write.prepareCommit(false, 4));
+
+        // Snapshot 6: pk=(1,30) val=600
+        write.write(rowData(1, 30, 600L));
+        commit.commit(5, write.prepareCommit(false, 5));
+
+        // Second compaction (snapshot 7) using dedicated compact writer
+        write.close();
+        commit.close();
+        compactWrite = compactTable.newWrite(commitUser);
+        compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(6, compactWrite.prepareCommit(true, 6));
+        compactWrite.close();
+        compactCommit.close();
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactlyInAnyOrder("1|10|500", "1|20|300", 
"1|30|600");
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingWithChangelogInput() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(rowData(1, 10, 1L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactly("1|10|1");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingWithChangelogLookup() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(CHANGELOG_PRODUCER, LOOKUP);
+                        });
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(rowData(1, 10, 1L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).containsExactly("1|10|1");
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSnapshotSequenceOrderingDeleteFromLaterSnapshot() throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        List<String> result = getResult(read, splits, toString);
+        assertThat(result).isEmpty();
+
+        write.close();
+        commit.close();
+    }
+
+    /**
+     * Regression: with snapshot-ordering on, a partial-update merge function 
must keep its result's
+     * {@code sequenceNumber} equal to the snapshot id carried by its inputs. 
The compacted file's
+     * per-record {@code _SEQUENCE_NUMBER} (and therefore its file-level 
minSequenceNumber) must
+     * stay a real snapshot id (>= 0); a regression to -1 would break ordering 
against later
+     * snapshots.
+     */
+    @Test
+    public void 
testSnapshotSequenceOrderingPartialUpdateCompactionPreservesSnapshotId()
+            throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()
+                        },
+                        new String[] {"pt", "a", "b", "c"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(MERGE_ENGINE, PARTIAL_UPDATE);
+                            conf.set(BUCKET, 1);
+                        },
+                        rowType);
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1: partial write of column b
+        write.write(GenericRow.of(1, 1, 100, null));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: partial write of column c — partial-update merges with 
snapshot 1's row
+        write.write(GenericRow.of(1, 1, null, 200));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // Snapshot 3: compact files from snapshots 1+2 using a dedicated 
compact writer. The
+        // compaction reader merges the two partial rows through 
PartialUpdateMergeFunction; the
+        // merged record's sequenceNumber must stay a real snapshot id.
+        write.close();
+        commit.close();
+        FileStoreTable compactTable =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+        StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+        compactWrite.close();
+        compactCommit.close();
+
+        List<DataSplit> splitsAfterCompact = 
table.newSnapshotReader().read().dataSplits();
+        for (DataSplit split : splitsAfterCompact) {
+            for (DataFileMeta file : split.dataFiles()) {
+                assertThat(file.minSequenceNumber())
+                        .as(
+                                "Compacted file %s must carry a real snapshot 
id in"
+                                        + " minSequenceNumber (>= 0). A value 
of -1 means the"
+                                        + " partial-update merge result lost 
its snapshot id"
+                                        + " during compaction.",
+                                file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+            }
+        }
+
+        // Snapshot 4: write a fresh value of b — this snapshot must win.
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 1, 999, null));
+        commit.commit(3, write.prepareCommit(false, 3));
+
+        // Snapshot 5: another compaction using dedicated compact writer
+        write.close();
+        commit.close();
+        compactWrite = compactTable.newWrite(commitUser);
+        compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+        compactWrite.close();
+        compactCommit.close();
+        for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+            for (DataFileMeta file : split.dataFiles()) {
+                assertThat(file.minSequenceNumber())
+                        .as("Final compacted file %s minSequenceNumber", 
file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+                assertThat(file.maxSequenceNumber())
+                        .as("Final compacted file %s maxSequenceNumber", 
file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+            }
+        }
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r ->
+                        r.getInt(0)
+                                + "|"
+                                + r.getInt(1)
+                                + "|"
+                                + (r.isNullAt(2) ? "null" : r.getInt(2))
+                                + "|"
+                                + (r.isNullAt(3) ? "null" : r.getInt(3));
+        List<String> result = getResult(read, splits, toString);
+        // b=999 (snapshot 4 wins over snapshot 1's 100), c=200 (only snapshot 
2 wrote it)
+        assertThat(result).containsExactly("1|1|999|200");
+
+        write.close();
+        commit.close();
+    }
+
+    /**
+     * Regression: with snapshot-ordering on, an aggregate merge function must 
keep its result's
+     * {@code sequenceNumber} equal to the snapshot id carried by its inputs. 
Mirrors the
+     * partial-update regression — if the merged record loses the snapshot id, 
the compacted file's
+     * minSequenceNumber regresses to -1.
+     */
+    @Test
+    public void 
testSnapshotSequenceOrderingAggregateCompactionPreservesSnapshotId()
+            throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()
+                        },
+                        new String[] {"pt", "a", "b", "c"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING, 
true);
+                            conf.set(CoreOptions.WRITE_ONLY, true);
+                            conf.set(MERGE_ENGINE, AGGREGATE);
+                            conf.set(BUCKET, 1);
+                            conf.set("fields.b.aggregate-function", "sum");
+                            conf.set("fields.c.aggregate-function", "max");
+                        },
+                        rowType);
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Snapshot 1
+        write.write(GenericRow.of(1, 1, 10, 100));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Snapshot 2: aggregates with snapshot 1's row.
+        write.write(GenericRow.of(1, 1, 20, 50));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        // Snapshot 3: compact using dedicated compact writer
+        write.close();
+        commit.close();
+        FileStoreTable compactTable =
+                
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+        StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+        compactWrite.close();
+        compactCommit.close();
+
+        for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+            for (DataFileMeta file : split.dataFiles()) {
+                assertThat(file.minSequenceNumber())
+                        .as(
+                                "Aggregate-compacted file %s must carry a real 
snapshot id in"
+                                        + " minSequenceNumber (>= 0). A value 
of -1 means the"
+                                        + " aggregate merge result lost its 
snapshot id during"
+                                        + " compaction.",
+                                file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+            }
+        }
+
+        // Snapshot 4: another insert that must aggregate on top of the 
compacted result.
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 1, 5, 999));
+        commit.commit(3, write.prepareCommit(false, 3));
+
+        // Snapshot 5: final compaction using dedicated compact writer
+        write.close();
+        commit.close();
+        compactWrite = compactTable.newWrite(commitUser);
+        compactCommit = compactTable.newCommit(commitUser);
+        compactWrite.compact(binaryRow(1), 0, true);
+        compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+        compactWrite.close();
+        compactCommit.close();
+        for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+            for (DataFileMeta file : split.dataFiles()) {
+                assertThat(file.minSequenceNumber())
+                        .as("Final compacted file %s minSequenceNumber", 
file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+                assertThat(file.maxSequenceNumber())
+                        .as("Final compacted file %s maxSequenceNumber", 
file.fileName())
+                        .isGreaterThanOrEqualTo(0L);
+            }
+        }
+
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newReadBuilder().newRead();
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getInt(2) + "|" 
+ r.getInt(3);
+        List<String> result = getResult(read, splits, toString);
+        // b = sum(10, 20, 5) = 35, c = max(100, 50, 999) = 999
+        assertThat(result).containsExactly("1|1|35|999");
+
+        write.close();
+        commit.close();
+    }
 }

Reply via email to