leaves12138 commented on code in PR #7832:
URL: https://github.com/apache/paimon/pull/7832#discussion_r3238780635
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:
##########
@@ -1193,4 +1197,57 @@ public void close() {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
+
+ /**
+ * When {@code sequence.snapshot-ordering} is enabled, we repurpose {@link
+ * DataFileMeta#minSequenceNumber()} and {@link
DataFileMeta#maxSequenceNumber()} to carry the
+ * commit snapshot id at file level. This avoids adding a new field to
DataFileMeta and follows
+ * the same pattern used by row-tracking tables (see {@link
+ * RowTrackingCommitUtils#assignRowTracking}). At read time, {@code
KeyValueFileReaderFactory}
+ * extracts the snapshot id from {@code minSequenceNumber} and stamps it
onto each {@code
+ * KeyValue}, where the sort-merge readers use it as the primary
tiebreaker.
+ *
+ * <p>The per-record sequence numbers stored inside data files (the {@code
_SEQUENCE_NUMBER}
+ * column in the key-value format) are unaffected and still serve as a
secondary tiebreaker
+ * within the same snapshot.
+ *
+ * <p>For {@link CommitKind#COMPACT} commits, we must NOT stamp with the
new snapshot id.
+ * Compaction may run concurrently with data writes: it reads from an
older snapshot and may not
+ * include files committed after it started. If we stamped the compacted
output with the
+ * compaction's snapshot id (which is higher than any concurrent write's
snapshot id), the
+ * compacted result would incorrectly shadow newer data. Instead, we
propagate the maximum
+ * snapshot id from the compaction's input files (the DELETE entries), so
that newer concurrent
+ * writes retain their ordering advantage. If no DELETE entries exist, we
fall back to the
+ * current snapshot id as a safe default.
+ *
+ * <p>Note: the snapshot id is stamped at file level, not propagated
through merge functions.
+ * Even if a merge function (e.g. aggregation) creates a new KeyValue
without copying
+ * snapshotId, the compacted output file will be correctly stamped here at
commit time.
+ */
+ private static List<ManifestEntry> assignSnapshotSequenceOrdering(
+ long snapshotId, CommitKind commitKind, List<ManifestEntry> files)
{
+ long stamp = snapshotId;
+ if (commitKind == CommitKind.COMPACT) {
+ boolean found = false;
+ stamp = 0;
+ for (ManifestEntry entry : files) {
+ if (entry.kind() == FileKind.DELETE) {
+ stamp = Math.max(stamp, entry.file().minSequenceNumber());
Review Comment:
Can you double check here?
If there is a compact between file a (snapshot = 1) and file c (snapshot =
3). Then we get file d (snapshot = 3). But the file b (snapshot = 2) does not
participate in the compaction. So file b remains snapshot = 2. File b and File
a have key range overlapping. So the may merge.
Before compaction, file a sequence number(snapshot id) is smaller than file
b.
After compaction, file a (which now is file c) sequence number is larger
than file b?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]