This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 68cf3bca10 [core] Support snapshot-based sequence ordering for
primary-key tables (#7832)
68cf3bca10 is described below
commit 68cf3bca10f2891cee2c1c9769669e0d1cd3765a
Author: Junrui Lee <[email protected]>
AuthorDate: Thu Jun 4 16:29:32 2026 +0800
[core] Support snapshot-based sequence ordering for primary-key tables
(#7832)
---
docs/docs/primary-key-table/sequence-rowkind.mdx | 47 ++
docs/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 22 +
.../src/main/java/org/apache/paimon/KeyValue.java | 5 +
.../java/org/apache/paimon/io/DataFileMeta.java | 7 +
.../paimon/io/KeyValueDataFileRecordReader.java | 32 +-
.../paimon/io/KeyValueFileReaderFactory.java | 25 +-
.../paimon/operation/FileStoreCommitImpl.java | 31 ++
.../org/apache/paimon/schema/SchemaValidation.java | 42 ++
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../compact/SortMergeSnapshotOrderingTest.java | 134 +++++
.../apache/paimon/schema/SchemaValidationTest.java | 49 ++
.../paimon/table/PrimaryKeySimpleTableTest.java | 569 +++++++++++++++++++++
13 files changed, 965 insertions(+), 6 deletions(-)
diff --git a/docs/docs/primary-key-table/sequence-rowkind.mdx
b/docs/docs/primary-key-table/sequence-rowkind.mdx
index 96fcc19963..8a0f49fb73 100644
--- a/docs/docs/primary-key-table/sequence-rowkind.mdx
+++ b/docs/docs/primary-key-table/sequence-rowkind.mdx
@@ -71,3 +71,50 @@ By default, the primary key table determines the row kind
according to the input
`'rowkind.field'` to use a field to extract row kind.
The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.
+
+## Snapshot Ordering
+
+In multi-writer scenarios where wall-clock sequence numbers cannot be globally
ordered across writers,
+you can enable `'sequence.snapshot-ordering'` to use the commit snapshot id as
the ordering key when
+merging records with the same primary key. Records from later snapshots are
considered newer,
+regardless of their per-record sequence number.
+
+<Tabs>
+<TabItem value="flink" label="Flink">
+
+```sql
+CREATE TABLE my_table (
+ pk BIGINT PRIMARY KEY NOT ENFORCED,
+ v1 DOUBLE,
+ v2 BIGINT
+) WITH (
+ 'sequence.snapshot-ordering' = 'true',
+ 'write-only' = 'true'
+);
+```
+
+</TabItem>
+</Tabs>
+
+:::warning
+This option requires `'write-only' = 'true'`. Compaction must be performed by
a separate dedicated
+compact job. This ensures that compaction correctly preserves the snapshot id
of each record.
+:::
+
+:::info
+`'sequence.snapshot-ordering'` is mutually exclusive with `'sequence.field'`.
You cannot enable both
+on the same table.
+:::
+
+:::info
+The ordering key is the commit snapshot id only; the order of records **within
the same snapshot** is
+not guaranteed, and this is by design. Under the default configuration it is
harmless: a writer
+buffers a commit's writes (`'write-buffer-spillable' = 'true'`) and runs them
through the merge
+function before flushing, so at most one record per primary key is written per
snapshot — the common
+case is fully covered. We therefore deliberately do not handle the case where
the same key is spread
+across multiple files of one snapshot. That case only arises with
`'write-buffer-spillable' = 'false'`,
+or when the spilled data exceeds `'write-buffer-spill-disk-size'`, where the
buffer may be flushed
+mid-commit; the same key can then land in multiple files of the same snapshot
with equal sequence
+numbers and their relative order becomes undefined. This affects only
intra-snapshot order, never the
+cross-snapshot ordering this feature provides.
+:::
diff --git a/docs/generated/core_configuration.html
b/docs/generated/core_configuration.html
index 1a66fb0ab0..9939b28b78 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -1361,6 +1361,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible
values:<ul><li>"ascending": specifies sequence.field sort order is
ascending.</li><li>"descending": specifies sequence.field sort order is
descending.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>sequence.snapshot-ordering</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>When enabled, merge uses the commit snapshot id as the
ordering key for primary-key conflicts: records from later snapshots always
win. Designed for multi-writer scenarios on the same primary-key table where
wall-clock sequence numbers cannot be globally ordered. The order of records
within the same snapshot is not guaranteed. Mutually exclusive with
sequence.field. Requires a primary-key table with write-only=true. Inline
compaction is not allowed because snapshot ids ar [...]
+ </tr>
<tr>
<td><h5>sink.process-time-zone</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index da734fa938..ea325667aa 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1020,6 +1020,24 @@ public class CoreOptions implements Serializable {
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");
+ @Immutable
+ public static final ConfigOption<Boolean> SEQUENCE_SNAPSHOT_ORDERING =
+ key("sequence.snapshot-ordering")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "When enabled, merge uses the commit snapshot id
as the ordering key "
+ + "for primary-key conflicts: records from
later snapshots "
+ + "always win. Designed for multi-writer
scenarios on the same "
+ + "primary-key table where wall-clock
sequence numbers cannot "
+ + "be globally ordered. The order of
records within the same "
+ + "snapshot is not guaranteed. Mutually
exclusive with "
+ + "sequence.field. Requires a primary-key
table with "
+ + "write-only=true. Inline compaction is
not allowed because "
+ + "snapshot ids are assigned only after
commit. To compact such "
+ + "tables, run a dedicated compaction
job/action with "
+ + "write-only=false.");
+
@Immutable
public static final ConfigOption<Boolean>
AGGREGATION_REMOVE_RECORD_ON_DELETE =
key("aggregation.remove-record-on-delete")
@@ -3492,6 +3510,10 @@ public class CoreOptions implements Serializable {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
+ public boolean snapshotSequenceOrdering() {
+ return options.get(SEQUENCE_SNAPSHOT_ORDERING);
+ }
+
public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index c314f21107..4c4ff61c24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -89,6 +89,11 @@ public class KeyValue {
return sequenceNumber;
}
+ public KeyValue setSequenceNumber(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ return this;
+ }
+
public RowKind valueKind() {
return valueKind;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index 23b34947bc..a8cdc031e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -277,8 +277,15 @@ public interface DataFileMeta {
SimpleStats valueStats();
+ /**
+ * Minimum sequence number of records in this file. When {@code
sequence.snapshot-ordering} is
+ * enabled for a primary-key table, this field is repurposed to carry the
commit snapshot id
+ * instead of the per-record sequence number range (the snapshot id is
stamped into it at commit
+ * time by {@code FileStoreCommitImpl}).
+ */
long minSequenceNumber();
+ /** @see #minSequenceNumber() */
long maxSequenceNumber();
long schemaId();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 6cf0876970..2538c08a21 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -36,12 +36,26 @@ public class KeyValueDataFileRecordReader implements
FileRecordReader<KeyValue>
private final FileRecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
+ private final boolean overrideSequenceWithSnapshotId;
+ private final long snapshotId;
public KeyValueDataFileRecordReader(
FileRecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
+ this(reader, keyType, valueType, level, false,
KeyValue.UNKNOWN_SEQUENCE);
+ }
+
+ public KeyValueDataFileRecordReader(
+ FileRecordReader<InternalRow> reader,
+ RowType keyType,
+ RowType valueType,
+ int level,
+ boolean overrideSequenceWithSnapshotId,
+ long snapshotId) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
+ this.overrideSequenceWithSnapshotId = overrideSequenceWithSnapshotId;
+ this.snapshotId = snapshotId;
}
@Nullable
@@ -53,10 +67,20 @@ public class KeyValueDataFileRecordReader implements
FileRecordReader<KeyValue>
}
return iterator.transform(
- internalRow ->
- internalRow == null
- ? null
- :
serializer.fromRow(internalRow).setLevel(level));
+ internalRow -> {
+ if (internalRow == null) {
+ return null;
+ }
+ KeyValue kv =
serializer.fromRow(internalRow).setLevel(level);
+ // In snapshot-ordering mode, an APPEND file's on-disk
per-record sequence
+ // numbers are stale; we override them with the commit
snapshot id so later
+ // snapshots win during merge. Any read path bypassing
this override would
+ // order APPEND records incorrectly.
+ if (overrideSequenceWithSnapshotId) {
+ kv.setSequenceNumber(snapshotId);
+ }
+ return kv;
+ });
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index fc505e19c2..5f7e374192 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.FileRecordReader;
@@ -42,6 +43,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
+import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -68,6 +70,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
private final long asyncThreshold;
private final boolean ignoreCorruptFiles;
private final boolean ignoreLostFiles;
+ private final boolean snapshotSequenceOrdering;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
@@ -93,6 +96,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
this.asyncThreshold =
coreOptions.fileReaderAsyncThreshold().getBytes();
this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
+ this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
@@ -168,7 +172,26 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
new ApplyDeletionVectorReader(fileRecordReader,
deletionVector.get());
}
- return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, file.level());
+ // In snapshot-ordering mode, APPEND files carry the commit snapshot
id in
+ // minSequenceNumber (stamped at commit time); override per-record
sequence with it so
+ // later snapshots win during merge. COMPACT files already carry the
snapshot id in their
+ // per-record _SEQUENCE_NUMBER and are left untouched.
+ boolean overrideSequenceWithSnapshotId = false;
+ if (snapshotSequenceOrdering) {
+ Preconditions.checkState(
+ file.fileSource().isPresent(),
+ "sequence.snapshot-ordering requires data files with
fileSource metadata. "
+ + "This option is only safe for newly-created
tables or empty tables. "
+ + "Legacy files without fileSource cannot be
ordered by commit snapshot id.");
+ overrideSequenceWithSnapshotId = file.fileSource().get() ==
FileSource.APPEND;
+ }
+ return new KeyValueDataFileRecordReader(
+ fileRecordReader,
+ keyType,
+ valueType,
+ file.level(),
+ overrideSequenceWithSnapshotId,
+ file.minSequenceNumber());
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 10c9b20a04..ddab7a0419 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -985,6 +985,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
deltaFiles = assigned.assignedEntries;
}
+ if (options.snapshotSequenceOrdering()) {
+ deltaFiles = stampSequenceWithSnapshotId(newSnapshotId,
commitKind, deltaFiles);
+ }
+
// the added records subtract the deleted records from
long deltaRecordCount = recordCountAdd(deltaFiles) -
recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount +
deltaRecordCount;
@@ -1261,4 +1265,31 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
+
+ /**
+ * Stamps the commit snapshot id into {@link
DataFileMeta#minSequenceNumber()} / {@link
+ * DataFileMeta#maxSequenceNumber()} of APPEND files, reusing these fields
instead of adding a
+ * new one (same pattern as {@link
RowTrackingCommitUtils#assignRowTracking}). COMPACT files are
+ * returned unchanged: their input was read through the override path, so
their per-record
+ * {@code _SEQUENCE_NUMBER} already carries the snapshot id.
+ *
+ * <p>All records of a snapshot share one id, so intra-snapshot order is
not preserved. This is
+ * accepted: the default spillable writer collapses a commit's writes
through the merge function
+ * to one record per key before flush, and the feature targets
cross-snapshot ordering only.
+ */
+ private static List<ManifestEntry> stampSequenceWithSnapshotId(
+ long snapshotId, CommitKind commitKind, List<ManifestEntry> files)
{
+ if (commitKind == CommitKind.COMPACT) {
+ return files;
+ }
+ List<ManifestEntry> result = new ArrayList<>(files.size());
+ for (ManifestEntry entry : files) {
+ if (entry.kind() == FileKind.ADD) {
+ result.add(entry.assignSequenceNumber(snapshotId, snapshotId));
+ } else {
+ result.add(entry);
+ }
+ }
+ return result;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 50228385a9..c600927af0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -105,6 +105,18 @@ public class SchemaValidation {
* @param schema the schema to be validated
*/
public static void validateTableSchema(TableSchema schema) {
+ validateTableSchema(schema, Collections.emptySet());
+ }
+
+ /**
+ * Validate the {@link TableSchema} and {@link CoreOptions}.
+ *
+ * @param schema the schema to be validated
+ * @param dynamicOptionKeys option keys that are overridden dynamically at
runtime (e.g. by
+ * dedicated compaction jobs) and should therefore be excluded from
certain static
+ * validations such as the {@code write-only} requirement for snapshot
ordering
+ */
+ public static void validateTableSchema(TableSchema schema, Set<String>
dynamicOptionKeys) {
CoreOptions options = new CoreOptions(schema.options());
validateOnlyContainPrimitiveType(schema.fields(),
schema.primaryKeys(), "primary key");
@@ -288,6 +300,10 @@ public class SchemaValidation {
"deletion-vectors.merge-on-read requires
deletion-vectors.enabled to be true.");
}
+ if (options.snapshotSequenceOrdering()) {
+ validateSnapshotSequenceOrdering(schema, options,
dynamicOptionKeys);
+ }
+
// vector field names must point to vector type
Set<String> fieldNamesSpecifiedAsVector = options.vectorField();
schema.fields()
@@ -614,6 +630,32 @@ public class SchemaValidation {
}
}
+ private static void validateSnapshotSequenceOrdering(
+ TableSchema schema, CoreOptions options, Set<String>
dynamicOptionKeys) {
+ checkArgument(
+ !schema.primaryKeys().isEmpty(),
+ "%s = true requires a primary-key table; append-only tables
cannot use "
+ + "snapshot-based sequence ordering.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key());
+ checkArgument(
+ options.sequenceField().isEmpty(),
+ "%s = true is mutually exclusive with %s; the snapshot id is
the sole tiebreaker.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+ CoreOptions.SEQUENCE_FIELD.key());
+ // Skip writeOnly check when write-only is dynamically overridden
(e.g. by dedicated
+ // compact jobs that override write-only=false at runtime).
+ if (!dynamicOptionKeys.contains(CoreOptions.WRITE_ONLY.key())) {
+ checkArgument(
+ options.writeOnly(),
+ "%s = true requires %s = true. Snapshot ordering relies on
snapshot id to "
+ + "determine record order, but inline compaction
happens before "
+ + "snapshot creation — files have not been stamped
with the correct "
+ + "snapshot id yet. Use dedicated compaction job
instead.",
+ CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(),
+ CoreOptions.WRITE_ONLY.key());
+ }
+ }
+
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 23214ff05f..0249a8e6b3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -374,7 +374,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
// validate schema with new options
- SchemaValidation.validateTableSchema(newTableSchema);
+ SchemaValidation.validateTableSchema(newTableSchema,
dynamicOptions.keySet());
return copy(newTableSchema);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
new file mode 100644
index 0000000000..0e29efd171
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeSnapshotOrderingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for snapshot-ordering in sort-merge readers. With {@code
sequence.snapshot-ordering}, the
+ * commit snapshot id is carried in each record's {@code sequenceNumber}
(stamped at read time for
+ * APPEND files), so the sort-merge readers need no snapshot-specific branch:
comparing by {@code
+ * sequenceNumber} already makes records from later snapshots win.
+ */
+public class SortMergeSnapshotOrderingTest {
+
+ private static final Comparator<org.apache.paimon.data.InternalRow>
KEY_COMPARATOR =
+ (a, b) -> Integer.compare(a.getInt(0), b.getInt(0));
+
+ private static final RowType VALUE_TYPE = RowType.of(DataTypes.INT());
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testLaterSnapshotWins(SortEngine sortEngine) throws
IOException {
+ // seq carries the snapshot id: snapshot 6 wins over snapshot 5.
+ KeyValue winner = merge(sortEngine, kv(1, 5, 999), kv(1, 6, 1));
+ assertThat(winner.value().getInt(0)).isEqualTo(1);
+ assertThat(winner.sequenceNumber()).isEqualTo(6);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testHigherSequenceWins(SortEngine sortEngine) throws
IOException {
+ KeyValue winner = merge(sortEngine, kv(1, 100, 100), kv(1, 50, 50));
+ assertThat(winner.value().getInt(0)).isEqualTo(100);
+ }
+
+ private static KeyValue kv(int key, long seq, int value) {
+ return new KeyValue()
+ .replace(GenericRow.of(key), seq, RowKind.INSERT,
GenericRow.of(value));
+ }
+
+ private static KeyValue merge(SortEngine sortEngine, KeyValue... kvs)
throws IOException {
+ List<RecordReader<KeyValue>> readers = new ArrayList<>();
+ for (KeyValue kv : kvs) {
+ readers.add(new SingleKvReader(kv));
+ }
+
+ MergeFunctionWrapper<KeyValue> wrapper =
+ new
ReducerMergeFunctionWrapper(DeduplicateMergeFunction.factory().create());
+
+ RecordReader<KeyValue> reader =
+ SortMergeReader.createSortMergeReader(
+ readers, KEY_COMPARATOR, null, wrapper, sortEngine);
+
+ RecordReader.RecordIterator<KeyValue> batch = reader.readBatch();
+ assertThat(batch).isNotNull();
+ KeyValue result = batch.next();
+ assertThat(result).isNotNull();
+ assertThat(batch.next()).isNull();
+ batch.releaseBatch();
+ reader.close();
+ return result;
+ }
+
+ private static class SingleKvReader implements RecordReader<KeyValue> {
+ private KeyValue kv;
+
+ SingleKvReader(KeyValue kv) {
+ this.kv = kv;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<KeyValue> readBatch() {
+ if (kv == null) {
+ return null;
+ }
+ KeyValue toReturn = kv;
+ kv = null;
+ return new RecordIterator<KeyValue>() {
+ private boolean returned = false;
+
+ @Nullable
+ @Override
+ public KeyValue next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return toReturn;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index beb4bfd376..6bda437692 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -470,6 +470,55 @@ class SchemaValidationTest {
new TableSchema(1, fields, 10, emptyList(),
singletonList("f1"), options, ""));
}
+ @Test
+ public void testSnapshotSequenceOrderingHappyPath() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ assertThatNoException().isThrownBy(() ->
validateTableSchemaExec(options));
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsNonWriteOnly() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining(CoreOptions.WRITE_ONLY.key());
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsSequenceField() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ options.put(CoreOptions.SEQUENCE_FIELD.key(), "f2");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining("sequence.field");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingRejectsNonPkTable() {
+ List<DataField> fields =
+ Arrays.asList(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()));
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true");
+ options.put(BUCKET.key(), String.valueOf(-1));
+ assertThatThrownBy(
+ () ->
+ validateTableSchema(
+ new TableSchema(
+ 1,
+ fields,
+ 10,
+ emptyList(),
+ emptyList(),
+ options,
+ "")))
+ .hasMessageContaining("primary-key");
+ }
+
@Test
public void testFileFormatPerLevelRejectsIncompatibleSchema() {
List<DataField> fields =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index ebc88e7a8e..4628710b9e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -2688,4 +2688,573 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
assertThatThrownBy(() -> table.mergeBranch(BRANCH_NAME, "main"))
.satisfies(anyCauseMatches(IllegalArgumentException.class,
"append-only tables"));
}
+
+ @Test
+ public void testSnapshotSequenceOrdering() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) many times so that the per-record
sequence number is high.
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(1, 10, 999L));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) once with a lower value. Because the
snapshot id (2)
+ // is larger than snapshot 1, this record should win even though its
per-record sequence
+ // number is much lower.
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithMinHeap() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.SORT_ENGINE,
CoreOptions.SortEngine.MIN_HEAP);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(1, 10, 999L));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void
testSnapshotSequenceOrderingFallsBackToSequenceWithinSnapshot() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Within a single snapshot, sequence number is the tiebreaker. The
later write (999)
+ // gets a higher sequence number and should win.
+ write.write(rowData(1, 10, 1L));
+ write.write(rowData(1, 10, 999L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|999");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void
testSnapshotSequenceOrderingCompactionPreservesInputSnapshotId() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write pk=(1,10) with val=200 (this should win after
compaction)
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write a DIFFERENT key pk=(1,20)
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Snapshot 4: compact using dedicated compact writer (simulates
compact job)
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splits) {
+ for (DataFileMeta file : split.dataFiles()) {
+ // The compacted file's minSequenceNumber should reflect the
min snapshot id
+ // of records inside (from per-record _SEQUENCE_NUMBER values
written during
+ // compaction), NOT the compaction commit's snapshot id (4).
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Compacted file %s should have
minSequenceNumber from per-record "
+ + "snapshot ids, not the compaction
commit's snapshot id",
+ file.fileName())
+ .isLessThanOrEqualTo(3);
+ }
+ }
+
+ // Also verify the read result is correct
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, toSplits(splits), toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|200", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingCompactionNoOrderingReversal()
throws Exception {
+ // Reproduces the scenario from the PR review: compaction of files from
+ // snapshot 1 and 3 must NOT cause records from snapshot 1 to win over
+ // an uncompacted file from snapshot 2.
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: write pk=(1,10) with val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: write SAME key pk=(1,10) with val=200 — this should win
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: write DIFFERENT key pk=(1,20) with val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // Compact all files using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ // Write pk=(1,10) again with val=999 — snapshot 5 should definitely
win
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 999L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ write.close();
+ commit.close();
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ // pk=(1,10): snapshot 5 (val=999) wins over snapshot 2 (val=200) and
snapshot 1 (val=100)
+ // pk=(1,20): snapshot 3 (val=300) is the only version
+ assertThat(result).containsExactlyInAnyOrder("1|10|999", "1|20|300");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingMultiRoundCompaction() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CoreOptions.BUCKET, 1);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: pk=(1,10) val=100
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: pk=(1,10) val=200 — should win over snapshot 1
+ write.write(rowData(1, 10, 200L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: pk=(1,20) val=300
+ write.write(rowData(1, 20, 300L));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ // First compaction (snapshot 4) using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(3, compactWrite.prepareCommit(true, 3));
+ compactWrite.close();
+ compactCommit.close();
+
+ // Snapshot 5: pk=(1,10) val=500 — should win over everything
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 500L));
+ commit.commit(4, write.prepareCommit(false, 4));
+
+ // Snapshot 6: pk=(1,30) val=600
+ write.write(rowData(1, 30, 600L));
+ commit.commit(5, write.prepareCommit(false, 5));
+
+ // Second compaction (snapshot 7) using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(6, compactWrite.prepareCommit(true, 6));
+ compactWrite.close();
+ compactCommit.close();
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactlyInAnyOrder("1|10|500", "1|20|300",
"1|30|600");
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogInput() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CHANGELOG_PRODUCER,
ChangelogProducer.INPUT);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingWithChangelogLookup() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(CHANGELOG_PRODUCER, LOOKUP);
+ });
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowData(1, 10, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).containsExactly("1|10|1");
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testSnapshotSequenceOrderingDeleteFromLaterSnapshot() throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ List<String> result = getResult(read, splits, toString);
+ assertThat(result).isEmpty();
+
+ write.close();
+ commit.close();
+ }
+
+ /**
+ * Regression: with snapshot-ordering on, a partial-update merge function
must keep its result's
+ * {@code sequenceNumber} equal to the snapshot id carried by its inputs.
The compacted file's
+ * per-record {@code _SEQUENCE_NUMBER} (and therefore its file-level
minSequenceNumber) must
+ * stay a real snapshot id (>= 0); a regression to -1 would break ordering
against later
+ * snapshots.
+ */
+ @Test
+ public void
testSnapshotSequenceOrderingPartialUpdateCompactionPreservesSnapshotId()
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(MERGE_ENGINE, PARTIAL_UPDATE);
+ conf.set(BUCKET, 1);
+ },
+ rowType);
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1: partial write of column b
+ write.write(GenericRow.of(1, 1, 100, null));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: partial write of column c — partial-update merges with
snapshot 1's row
+ write.write(GenericRow.of(1, 1, null, 200));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: compact files from snapshots 1+2 using a dedicated
compact writer. The
+ // compaction reader merges the two partial rows through
PartialUpdateMergeFunction; the
+ // merged record's sequenceNumber must stay a real snapshot id.
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+ compactWrite.close();
+ compactCommit.close();
+
+ List<DataSplit> splitsAfterCompact =
table.newSnapshotReader().read().dataSplits();
+ for (DataSplit split : splitsAfterCompact) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Compacted file %s must carry a real snapshot
id in"
+ + " minSequenceNumber (>= 0). A value
of -1 means the"
+ + " partial-update merge result lost
its snapshot id"
+ + " during compaction.",
+ file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ // Snapshot 4: write a fresh value of b — this snapshot must win.
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 1, 999, null));
+ commit.commit(3, write.prepareCommit(false, 3));
+
+ // Snapshot 5: another compaction using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+ compactWrite.close();
+ compactCommit.close();
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as("Final compacted file %s minSequenceNumber",
file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ assertThat(file.maxSequenceNumber())
+ .as("Final compacted file %s maxSequenceNumber",
file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r ->
+ r.getInt(0)
+ + "|"
+ + r.getInt(1)
+ + "|"
+ + (r.isNullAt(2) ? "null" : r.getInt(2))
+ + "|"
+ + (r.isNullAt(3) ? "null" : r.getInt(3));
+ List<String> result = getResult(read, splits, toString);
+ // b=999 (snapshot 4 wins over snapshot 1's 100), c=200 (only snapshot
2 wrote it)
+ assertThat(result).containsExactly("1|1|999|200");
+
+ write.close();
+ commit.close();
+ }
+
+ /**
+ * Regression: with snapshot-ordering on, an aggregate merge function must
keep its result's
+ * {@code sequenceNumber} equal to the snapshot id carried by its inputs.
Mirrors the
+ * partial-update regression — if the merged record loses the snapshot id,
the compacted file's
+ * minSequenceNumber regresses to -1.
+ */
+ @Test
+ public void
testSnapshotSequenceOrderingAggregateCompactionPreservesSnapshotId()
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING,
true);
+ conf.set(CoreOptions.WRITE_ONLY, true);
+ conf.set(MERGE_ENGINE, AGGREGATE);
+ conf.set(BUCKET, 1);
+ conf.set("fields.b.aggregate-function", "sum");
+ conf.set("fields.c.aggregate-function", "max");
+ },
+ rowType);
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Snapshot 1
+ write.write(GenericRow.of(1, 1, 10, 100));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Snapshot 2: aggregates with snapshot 1's row.
+ write.write(GenericRow.of(1, 1, 20, 50));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // Snapshot 3: compact using dedicated compact writer
+ write.close();
+ commit.close();
+ FileStoreTable compactTable =
+
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ StreamTableWrite compactWrite = compactTable.newWrite(commitUser);
+ StreamTableCommit compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(2, compactWrite.prepareCommit(true, 2));
+ compactWrite.close();
+ compactCommit.close();
+
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as(
+ "Aggregate-compacted file %s must carry a real
snapshot id in"
+ + " minSequenceNumber (>= 0). A value
of -1 means the"
+ + " aggregate merge result lost its
snapshot id during"
+ + " compaction.",
+ file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ // Snapshot 4: another insert that must aggregate on top of the
compacted result.
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 1, 5, 999));
+ commit.commit(3, write.prepareCommit(false, 3));
+
+ // Snapshot 5: final compaction using dedicated compact writer
+ write.close();
+ commit.close();
+ compactWrite = compactTable.newWrite(commitUser);
+ compactCommit = compactTable.newCommit(commitUser);
+ compactWrite.compact(binaryRow(1), 0, true);
+ compactCommit.commit(4, compactWrite.prepareCommit(true, 4));
+ compactWrite.close();
+ compactCommit.close();
+ for (DataSplit split : table.newSnapshotReader().read().dataSplits()) {
+ for (DataFileMeta file : split.dataFiles()) {
+ assertThat(file.minSequenceNumber())
+ .as("Final compacted file %s minSequenceNumber",
file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ assertThat(file.maxSequenceNumber())
+ .as("Final compacted file %s maxSequenceNumber",
file.fileName())
+ .isGreaterThanOrEqualTo(0L);
+ }
+ }
+
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newReadBuilder().newRead();
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getInt(2) + "|"
+ r.getInt(3);
+ List<String> result = getResult(read, splits, toString);
+ // b = sum(10, 20, 5) = 35, c = max(100, 50, 999) = 999
+ assertThat(result).containsExactly("1|1|35|999");
+
+ write.close();
+ commit.close();
+ }
}