JunRuiLee commented on code in PR #7832:
URL: https://github.com/apache/paimon/pull/7832#discussion_r3239716024
##########
paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java:
##########
@@ -36,12 +36,23 @@ public class KeyValueDataFileRecordReader implements
FileRecordReader<KeyValue>
private final FileRecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
+ private final long snapshotId;
public KeyValueDataFileRecordReader(
FileRecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
+ this(reader, keyType, valueType, level, KeyValue.UNKNOWN_SNAPSHOT_ID);
Review Comment:
fixed
##########
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java:
##########
@@ -1193,4 +1197,57 @@ public void close() {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
+
+ /**
+ * When {@code sequence.snapshot-ordering} is enabled, we repurpose {@link
+ * DataFileMeta#minSequenceNumber()} and {@link
DataFileMeta#maxSequenceNumber()} to carry the
+ * commit snapshot id at file level. This avoids adding a new field to
DataFileMeta and follows
+ * the same pattern used by row-tracking tables (see {@link
+ * RowTrackingCommitUtils#assignRowTracking}). At read time, {@code
KeyValueFileReaderFactory}
+ * extracts the snapshot id from {@code minSequenceNumber} and stamps it
onto each {@code
+ * KeyValue}, where the sort-merge readers use it as the primary
tiebreaker.
+ *
+ * <p>The per-record sequence numbers stored inside data files (the {@code
_SEQUENCE_NUMBER}
+ * column in the key-value format) are unaffected and still serve as a
secondary tiebreaker
+ * within the same snapshot.
+ *
+ * <p>For {@link CommitKind#COMPACT} commits, we must NOT stamp with the
new snapshot id.
+ * Compaction may run concurrently with data writes: it reads from an
older snapshot and may not
+ * include files committed after it started. If we stamped the compacted
output with the
+ * compaction's snapshot id (which is higher than any concurrent write's
snapshot id), the
+ * compacted result would incorrectly shadow newer data. Instead, we
propagate the maximum
+ * snapshot id from the compaction's input files (the DELETE entries), so
that newer concurrent
+ * writes retain their ordering advantage. If no DELETE entries exist, we
fall back to the
+ * current snapshot id as a safe default.
+ *
+ * <p>Note: the snapshot id is stamped at file level, not propagated
through merge functions.
+ * Even if a merge function (e.g. aggregation) creates a new KeyValue
without copying
+ * snapshotId, the compacted output file will be correctly stamped here at
commit time.
+ */
+ private static List<ManifestEntry> assignSnapshotSequenceOrdering(
+ long snapshotId, CommitKind commitKind, List<ManifestEntry> files)
{
+ long stamp = snapshotId;
+ if (commitKind == CommitKind.COMPACT) {
+ boolean found = false;
+ stamp = 0;
+ for (ManifestEntry entry : files) {
+ if (entry.kind() == FileKind.DELETE) {
+ stamp = Math.max(stamp, entry.file().minSequenceNumber());
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]