JunRuiLee commented on code in PR #7832:
URL: https://github.com/apache/paimon/pull/7832#discussion_r3239716868


##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:
##########
@@ -1193,4 +1197,57 @@ public void close() {
         IOUtils.closeAllQuietly(commitCallbacks);
         IOUtils.closeQuietly(snapshotCommit);
     }
+
+    /**
+     * When {@code sequence.snapshot-ordering} is enabled, we repurpose {@link
+     * DataFileMeta#minSequenceNumber()} and {@link 
DataFileMeta#maxSequenceNumber()} to carry the
+     * commit snapshot id at file level. This avoids adding a new field to 
DataFileMeta and follows
+     * the same pattern used by row-tracking tables (see {@link
+     * RowTrackingCommitUtils#assignRowTracking}). At read time, {@code 
KeyValueFileReaderFactory}
+     * extracts the snapshot id from {@code minSequenceNumber} and stamps it 
onto each {@code
+     * KeyValue}, where the sort-merge readers use it as the primary 
tiebreaker.
+     *
+     * <p>The per-record sequence numbers stored inside data files (the {@code 
_SEQUENCE_NUMBER}
+     * column in the key-value format) are unaffected and still serve as a 
secondary tiebreaker
+     * within the same snapshot.
+     *
+     * <p>For {@link CommitKind#COMPACT} commits, we must NOT stamp with the 
new snapshot id.
+     * Compaction may run concurrently with data writes: it reads from an 
older snapshot and may not
+     * include files committed after it started. If we stamped the compacted 
output with the
+     * compaction's snapshot id (which is higher than any concurrent write's 
snapshot id), the
+     * compacted result would incorrectly shadow newer data. Instead, we 
propagate the maximum
+     * snapshot id from the compaction's input files (the DELETE entries), so 
that newer concurrent
+     * writes retain their ordering advantage. If no DELETE entries exist, we 
fall back to the
+     * current snapshot id as a safe default.
+     *
+     * <p>Note: the snapshot id is stamped at file level, not propagated 
through merge functions.
+     * Even if a merge function (e.g. aggregation) creates a new KeyValue 
without copying
+     * snapshotId, the compacted output file will be correctly stamped here at 
commit time.
+     */
+    private static List<ManifestEntry> assignSnapshotSequenceOrdering(
+            long snapshotId, CommitKind commitKind, List<ManifestEntry> files) 
{
+        long stamp = snapshotId;
+        if (commitKind == CommitKind.COMPACT) {
+            boolean found = false;
+            stamp = 0;
+            for (ManifestEntry entry : files) {
+                if (entry.kind() == FileKind.DELETE) {
+                    stamp = Math.max(stamp, entry.file().minSequenceNumber());

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to