This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 06e9e05bbe [core] Fix bug of bootstrap in ClusteringKeyIndex (#7795)
06e9e05bbe is described below
commit 06e9e05bbee39d36b1c9e6dcebd67b234ae84bad
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat May 9 17:48:30 2026 +0800
[core] Fix bug of bootstrap in ClusteringKeyIndex (#7795)
---
.../paimon/lookup/sort/db/SimpleLsmKvDb.java | 63 +++++++---
.../paimon/lookup/sort/db/SimpleLsmKvDbTest.java | 75 +++++++++++
.../org/apache/paimon/append/cluster/Sorter.java | 4 +-
.../org/apache/paimon/codegen/CodeGenUtils.java | 23 ++--
.../paimon/crosspartition/GlobalIndexAssigner.java | 3 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 3 +-
.../apache/paimon/lookup/rocksdb/RocksDBState.java | 3 +-
.../clustering/ClusteringCompactManager.java | 5 +-
.../compact/clustering/ClusteringFileRewriter.java | 3 +-
.../compact/clustering/ClusteringKeyIndex.java | 3 +-
.../paimon/sort/BinaryExternalSortBuffer.java | 28 +----
.../apache/paimon/utils/KeyComparatorSupplier.java | 2 +-
.../paimon/utils/UserDefinedSeqComparator.java | 6 +-
.../apache/paimon/codegen/CodeGenUtilsTest.java | 18 +--
.../paimon/separated/ClusteringTableTest.java | 140 +++++++++++++++++++++
.../apache/paimon/flink/SchemaChangeITCase.java | 1 +
.../flink/action/DataEvolutionMergeIntoAction.java | 3 +-
.../flink/compact/IncrementalClusterCompact.java | 3 +-
.../apache/paimon/flink/sorter/SortOperator.java | 8 +-
.../org/apache/paimon/flink/sorter/SortUtils.java | 3 +-
.../paimon/flink/sorter/SortOperatorTest.java | 6 +-
21 files changed, 307 insertions(+), 96 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
index c5b8c53fd9..20d606a516 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -247,11 +247,13 @@ public class SimpleLsmKvDb implements Closeable {
int targetLevel = MAX_LEVELS - 1;
List<SstFileMetadata> targetLevelFiles = levels.get(targetLevel);
+ List<SstFileMetadata> bulkLoadFiles = new ArrayList<>();
SortLookupStoreWriter currentWriter = null;
File currentSstFile = null;
MemorySlice currentFileMinKey = null;
MemorySlice currentFileMaxKey = null;
+ MemorySlice previousFileMaxKey = null;
long currentBatchSize = 0;
try {
@@ -259,28 +261,30 @@ public class SimpleLsmKvDb implements Closeable {
Map.Entry<byte[], byte[]> entry = sortedEntries.next();
byte[] key = entry.getKey();
byte[] value = entry.getValue();
+ MemorySlice currentKey = MemorySlice.wrap(key);
if (currentWriter == null) {
currentSstFile = newSstFile();
currentWriter = storeFactory.createWriter(currentSstFile,
null);
- currentFileMinKey = MemorySlice.wrap(key);
+ currentFileMinKey = currentKey;
currentBatchSize = 0;
}
currentWriter.put(key, value);
- currentFileMaxKey = MemorySlice.wrap(key);
+ currentFileMaxKey = currentKey;
currentBatchSize += key.length + value.length;
if (currentBatchSize >= maxSstFileSize) {
currentWriter.close();
- targetLevelFiles.add(
- new SstFileMetadata(
+ currentWriter = null;
+ previousFileMaxKey =
+ addBulkLoadSstFile(
+ bulkLoadFiles,
currentSstFile,
currentFileMinKey,
currentFileMaxKey,
- 0,
- targetLevel));
- currentWriter = null;
+ previousFileMaxKey,
+ targetLevel);
currentSstFile = null;
currentFileMinKey = null;
currentFileMaxKey = null;
@@ -289,13 +293,14 @@ public class SimpleLsmKvDb implements Closeable {
if (currentWriter != null) {
currentWriter.close();
- targetLevelFiles.add(
- new SstFileMetadata(
- currentSstFile,
- currentFileMinKey,
- currentFileMaxKey,
- 0,
- targetLevel));
+ currentWriter = null;
+ addBulkLoadSstFile(
+ bulkLoadFiles,
+ currentSstFile,
+ currentFileMinKey,
+ currentFileMaxKey,
+ previousFileMaxKey,
+ targetLevel);
}
} catch (IOException | RuntimeException e) {
if (currentWriter != null) {
@@ -308,10 +313,34 @@ public class SimpleLsmKvDb implements Closeable {
throw e;
}
+ targetLevelFiles.addAll(bulkLoadFiles);
+
LOG.info(
- "Bulk-loaded {} SST files directly to level {}",
- targetLevelFiles.size(),
- targetLevel);
+ "Bulk-loaded {} SST files directly to level {}",
bulkLoadFiles.size(), targetLevel);
+ }
+
+ private MemorySlice addBulkLoadSstFile(
+ List<SstFileMetadata> targetLevelFiles,
+ File currentSstFile,
+ MemorySlice currentFileMinKey,
+ MemorySlice currentFileMaxKey,
+ @Nullable MemorySlice previousFileMaxKey,
+ int targetLevel) {
+ if (keyComparator.compare(currentFileMinKey, currentFileMaxKey) > 0) {
+ throw new IllegalArgumentException(
+ "bulkLoad requires entries sorted by the configured key
comparator; "
+ + "generated SST min key is greater than max
key.");
+ }
+ if (previousFileMaxKey != null
+ && keyComparator.compare(previousFileMaxKey,
currentFileMinKey) > 0) {
+ throw new IllegalArgumentException(
+ "bulkLoad requires entries sorted by the configured key
comparator; "
+ + "generated SST key ranges are not ordered.");
+ }
+ targetLevelFiles.add(
+ new SstFileMetadata(
+ currentSstFile, currentFileMinKey, currentFileMaxKey,
0, targetLevel));
+ return currentFileMaxKey;
}
//
-------------------------------------------------------------------------
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
index 92a5488591..9ac869a318 100644
---
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -1406,6 +1406,77 @@ public class SimpleLsmKvDbTest {
}
}
+ @Test
+ public void testBulkLoadFailsOnUnsortedEntries() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ entries.add(entry("key-00002", "value-2"));
+ entries.add(entry("key-00001", "value-1"));
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
db.bulkLoad(entries.iterator()));
+ Assertions.assertTrue(exception.getMessage().contains("sorted"));
+ Assertions.assertEquals(0, db.getSstFileCount());
+ }
+ }
+
+ @Test
+ public void testBulkLoadFailsOnUnorderedSstRanges() throws IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"bulk-unordered-ranges-db"))
+ .memTableFlushThreshold(1024)
+ .maxSstFileSize(1)
+ .blockSize(256)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ entries.add(entry("key-00001", "value-1"));
+ entries.add(entry("key-00003", "value-3"));
+ entries.add(entry("key-00002", "value-2"));
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
db.bulkLoad(entries.iterator()));
+ Assertions.assertTrue(exception.getMessage().contains("ranges"));
+ Assertions.assertEquals(0, db.getSstFileCount());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testBulkLoadOrderCheckUsesConfiguredComparator() throws
IOException {
+ Comparator<MemorySlice> reverseComparator =
+ new Comparator<MemorySlice>() {
+ @Override
+ public int compare(MemorySlice a, MemorySlice b) {
+ return b.compareTo(a);
+ }
+ };
+
+ try (SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"bulk-reverse-db"))
+ .memTableFlushThreshold(1024)
+ .blockSize(256)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .keyComparator(reverseComparator)
+ .build()) {
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ entries.add(entry("key-a", "value-a"));
+ entries.add(entry("key-b", "value-b"));
+
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
db.bulkLoad(entries.iterator()));
+
Assertions.assertTrue(exception.getMessage().contains("comparator"));
+ }
+ }
+
@Test
public void testBulkLoadThenPutAndGet() throws IOException {
try (SimpleLsmKvDb db = createDb()) {
@@ -1500,6 +1571,10 @@ public class SimpleLsmKvDbTest {
db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
}
+ private static Map.Entry<byte[], byte[]> entry(String key, String value) {
+ return new AbstractMap.SimpleImmutableEntry<>(key.getBytes(UTF_8),
value.getBytes(UTF_8));
+ }
+
private static String getString(SimpleLsmKvDb db, String key) throws
IOException {
byte[] bytes = db.get(key.getBytes(UTF_8));
if (bytes == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
index 60dbecdde0..41786992de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
@@ -76,7 +76,6 @@ public abstract class Sorter {
int spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
CompressOptions spillCompression = options.spillCompressOptions();
MemorySize maxDiskSize = options.writeBufferSpillDiskSize();
- boolean sequenceOrder = options.sequenceFieldSortOrderIsAscending();
this.ioManager = ioManager;
this.buffer =
@@ -88,8 +87,7 @@ public abstract class Sorter {
pageSize,
spillSortMaxNumFiles,
spillCompression,
- maxDiskSize,
- sequenceOrder);
+ maxDiskSize);
}
public abstract InternalRow assignSortKey(InternalRow row);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
index ec6887960c..a88a9e6be1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
@@ -83,21 +83,20 @@ public class CodeGenUtils {
}
public static RecordComparator newRecordComparator(List<DataType>
inputTypes) {
- return newRecordComparator(
- inputTypes, IntStream.range(0, inputTypes.size()).toArray(),
true);
+ int[] sortFields = new int[inputTypes.size()];
+ for (int i = 0; i < sortFields.length; i++) {
+ sortFields[i] = i;
+ }
+ return newRecordComparator(inputTypes, sortFields);
}
public static RecordComparator newRecordComparator(
- List<DataType> inputTypes, int[] sortFields, boolean
isAscendingOrder) {
- return generate(
- RecordComparator.class,
- inputTypes,
- sortFields,
- isAscendingOrder,
- () ->
- getCodeGenerator()
- .generateRecordComparator(
- inputTypes, sortFields,
isAscendingOrder));
+ List<DataType> inputTypes, int[] sortFields) {
+ boolean[] ascendingOrders = new boolean[sortFields.length];
+ for (int i = 0; i < sortFields.length; i++) {
+ ascendingOrders[i] = true;
+ }
+ return newRecordComparator(inputTypes, sortFields, ascendingOrders);
}
public static RecordComparator newRecordComparator(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 9097c5c482..12f90cd447 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -307,8 +307,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompressOptions(),
- coreOptions.writeBufferSpillDiskSize(),
- coreOptions.sequenceFieldSortOrderIsAscending());
+ coreOptions.writeBufferSpillDiskSize());
Function<SortOrder, RowIterator> iteratorFunction =
sortOrder -> {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index cfeaa99918..577a04642f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -218,8 +218,7 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
- options.writeBufferSpillDiskSize(),
- options.sequenceFieldSortOrderIsAscending());
+ options.writeBufferSpillDiskSize());
List<Split> splitList = Collections.singletonList(split);
RecordReader<InternalRow> reader =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
index 62257cfd34..f564fba0d3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
@@ -128,8 +128,7 @@ public abstract class RocksDBState<K, V, CacheV> implements
State<K, V> {
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
- options.writeBufferSpillDiskSize(),
- options.sequenceFieldSortOrderIsAscending());
+ options.writeBufferSpillDiskSize());
}
/** A class wraps byte[] to indicate contain or not contain. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 61813513d1..5f905d9b25 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -112,11 +112,10 @@ public class ClusteringCompactManager extends
CompactFutureManager {
RecordComparator clusteringComparatorAlone =
CodeGenUtils.newRecordComparator(
valueType.project(clusteringColumns).getFieldTypes(),
- IntStream.range(0, clusteringColumns.size()).toArray(),
- true);
+ IntStream.range(0,
clusteringColumns.size()).toArray());
RecordComparator clusteringComparatorInValue =
CodeGenUtils.newRecordComparator(
- valueType.getFieldTypes(), clusteringColumnIndexes,
true);
+ valueType.getFieldTypes(), clusteringColumnIndexes);
SimpleLsmKvDb kvDb =
SimpleLsmKvDb.builder(new File(ioManager.pickTempDir()))
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 7788d4ced9..451d7c7500 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -136,8 +136,7 @@ public class ClusteringFileRewriter {
pageSize,
maxNumFileHandles,
compression,
- MemorySize.MAX_VALUE,
- false);
+ MemorySize.MAX_VALUE);
try (RecordReader<KeyValue> reader =
valueReaderFactory.createRecordReader(inputFile)) {
try (CloseableIterator<KeyValue> iterator =
reader.toCloseableIterator()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index fee1abae2b..7855bfacbb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -123,8 +123,7 @@ public class ClusteringKeyIndex implements Closeable {
pageSize,
maxNumFileHandles,
compression,
- MemorySize.MAX_VALUE,
- false);
+ MemorySize.MAX_VALUE);
RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
InternalRow.FieldGetter[] keyFieldGetters =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index c6495811de..31963420e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -32,7 +32,6 @@ import org.apache.paimon.disk.FileChannelUtil;
import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -101,30 +100,9 @@ public class BinaryExternalSortBuffer implements
SortBuffer {
int pageSize,
int maxNumFileHandles,
CompressOptions compression,
- MemorySize maxDiskSize,
- boolean sequenceOrder) {
- return create(
- ioManager,
- rowType,
- keyFields,
- new HeapMemorySegmentPool(bufferSize, pageSize),
- maxNumFileHandles,
- compression,
- maxDiskSize,
- sequenceOrder);
- }
-
- public static BinaryExternalSortBuffer create(
- IOManager ioManager,
- RowType rowType,
- int[] keyFields,
- MemorySegmentPool pool,
- int maxNumFileHandles,
- CompressOptions compression,
- MemorySize maxDiskSize,
- boolean sequenceOrder) {
- RecordComparator comparator =
- newRecordComparator(rowType.getFieldTypes(), keyFields,
sequenceOrder);
+ MemorySize maxDiskSize) {
+ RecordComparator comparator =
newRecordComparator(rowType.getFieldTypes(), keyFields);
+ HeapMemorySegmentPool pool = new HeapMemorySegmentPool(bufferSize,
pageSize);
BinaryInMemorySortBuffer sortBuffer =
BinaryInMemorySortBuffer.createBuffer(
newNormalizedKeyComputer(rowType.getFieldTypes(),
keyFields),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
index 25fd07b7ca..6f90cef01a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
@@ -45,6 +45,6 @@ public class KeyComparatorSupplier implements
SerializableSupplier<Comparator<In
@Override
public RecordComparator get() {
- return newRecordComparator(inputTypes, sortFields, true);
+ return newRecordComparator(inputTypes, sortFields);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
index 7326a13509..144c98809e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -83,9 +83,13 @@ public class UserDefinedSeqComparator implements
FieldsComparator {
return null;
}
+ boolean[] ascendingOrders = new boolean[sequenceFields.length];
+ for (int i = 0; i < sequenceFields.length; i++) {
+ ascendingOrders[i] = isAscendingOrder;
+ }
RecordComparator comparator =
CodeGenUtils.newRecordComparator(
- rowType.getFieldTypes(), sequenceFields,
isAscendingOrder);
+ rowType.getFieldTypes(), sequenceFields,
ascendingOrders);
return new UserDefinedSeqComparator(sequenceFields, comparator,
isAscendingOrder);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
index 2d98596a28..b6cab84413 100644
--- a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
@@ -85,7 +85,7 @@ class CodeGenUtilsTest {
@Test
public void testRecordComparatorCodegenCache() {
assertClassEquals(
- () -> newRecordComparator(Arrays.asList(STRING(), INT()), new
int[] {0, 1}, true));
+ () -> newRecordComparator(Arrays.asList(STRING(), INT()), new
int[] {0, 1}));
}
@Test
@@ -93,23 +93,25 @@ class CodeGenUtilsTest {
assertClassEquals(
() ->
newRecordComparator(
- Arrays.asList(STRING(), VECTOR(3, INT())), new
int[] {0, 1}, true));
+ Arrays.asList(STRING(), VECTOR(3, INT())), new
int[] {0, 1}));
}
@Test
public void testRecordComparatorCodegenCacheMiss() {
assertClassNotEquals(
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}, true),
- newRecordComparator(
- Arrays.asList(STRING(), INT(), DOUBLE()), new int[]
{0, 1, 2}, true));
+ newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}),
+ newRecordComparator(Arrays.asList(STRING(), INT(), DOUBLE()),
new int[] {0, 1, 2}));
}
@Test
public void testRecordComparatorOrderCacheMiss() {
RecordComparator ascending =
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}, true);
+ newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1});
RecordComparator descending =
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}, false);
+ newRecordComparator(
+ Arrays.asList(STRING(), INT()),
+ new int[] {0, 1},
+ new boolean[] {false, false});
InternalRow row1 = GenericRow.of(BinaryString.fromString("a"), 1);
InternalRow row2 = GenericRow.of(BinaryString.fromString("b"), 1);
@@ -134,7 +136,7 @@ class CodeGenUtilsTest {
@Test
public void testHybridNotEqual() {
assertClassNotEquals(
- newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}, true),
+ newRecordComparator(Arrays.asList(STRING(), INT()), new int[]
{0, 1}),
newNormalizedKeyComputer(Arrays.asList(STRING(), INT()), new
int[] {0, 1}));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index d1a7f070cd..c988ad23b4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -712,6 +713,84 @@ class ClusteringTableTest {
assertThat(readRows(firstRowTable)).containsExactlyInAnyOrderElementsOf(firstBatch);
}
+ /**
+ * Test first-row mode with composite primary key (STRING, INT) and same
values inserted twice.
+ * Reproduces an issue where duplicate detection fails or produces wrong
results.
+ */
+ @Test
+ public void testFirstRowCompositePkSameValues() throws Exception {
+ Table firstRowTable = createFirstRowTableCompositePk();
+
+ // Commit 1: initial records
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+ GenericRow.of(BinaryString.fromString("hi"), 2, 20)));
+
+ // Verify first commit
+ assertThat(readRowsCompositePk(firstRowTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+ GenericRow.of(BinaryString.fromString("hi"), 2, 20));
+
+ // Commit 2: same keys with same values again — all should be ignored
in first-row mode
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+ GenericRow.of(BinaryString.fromString("hi"), 2, 20)));
+
+ // Should still see the first values
+ assertThat(readRowsCompositePk(firstRowTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+ GenericRow.of(BinaryString.fromString("hi"), 2, 20));
+ }
+
+ /** Test first-row mode keeps original values after bootstrapping
composite key index. */
+ @Test
+ public void testFirstRowCompositePkKeepsOriginalValuesAcrossBootstrap()
throws Exception {
+ Table firstRowTable = createFirstRowTableCompositePk();
+
+ List<GenericRow> originalRows =
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("same"), 1, 10),
+ GenericRow.of(BinaryString.fromString("same"), 2, 20),
+ GenericRow.of(BinaryString.fromString("same"), 3, 30));
+ writeRows(firstRowTable, originalRows);
+
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("same"), 1, 100),
+ GenericRow.of(BinaryString.fromString("same"), 2, 200),
+ GenericRow.of(BinaryString.fromString("same"), 3,
300)));
+
+ assertThat(readRowsCompositePk(firstRowTable))
+ .containsExactlyInAnyOrderElementsOf(originalRows);
+
assertThat(dataFiles(firstRowTable).stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(originalRows.size());
+ }
+
+ /** Test sort-and-rewrite writes clustering ranges in ascending order. */
+ @Test
+ public void testFirstRowSortAndRewriteFileKeepsAscendingClusteringRange()
throws Exception {
+ Table firstRowTable =
createFirstRowTableWithCompositeClusteringColumns();
+
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("same"), 2),
+ GenericRow.of(2, BinaryString.fromString("same"), 1)));
+
+ List<DataFileMeta> dataFiles = dataFiles(firstRowTable);
+ assertThat(dataFiles).hasSize(1);
+ DataFileMeta dataFile = dataFiles.get(0);
+ assertThat(dataFile.minKey().getInt(1)).isEqualTo(1);
+ assertThat(dataFile.maxKey().getInt(1)).isEqualTo(2);
+ }
+
// ==================== Spill Tests ====================
/** Test first-row mode with spill: keeps first values despite many
duplicate commits. */
@@ -1044,6 +1123,59 @@ class ClusteringTableTest {
return catalog.getTable(identifier);
}
+ private Table createFirstRowTableCompositePk() throws Exception {
+ Identifier identifier = Identifier.create("default",
"first_row_composite_pk_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("k1", DataTypes.STRING())
+ .column("k2", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("k1", "k2")
+ .option(DELETION_VECTORS_ENABLED.key(), "true")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "v")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .option(MERGE_ENGINE.key(), "first-row")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ private Table createFirstRowTableWithCompositeClusteringColumns() throws
Exception {
+ Identifier identifier =
+ Identifier.create("default",
"first_row_composite_clustering_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("c", DataTypes.STRING())
+ .column("b", DataTypes.INT())
+ .primaryKey("a")
+ .option(DELETION_VECTORS_ENABLED.key(), "true")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "c,b")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .option(MERGE_ENGINE.key(), "first-row")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ private List<GenericRow> readRowsCompositePk(Table targetTable) throws
Exception {
+ ReadBuilder readBuilder = targetTable.newReadBuilder();
+ @SuppressWarnings("resource")
+ CloseableIterator<InternalRow> iterator =
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .toCloseableIterator();
+ List<GenericRow> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ result.add(GenericRow.of(row.getString(0), row.getInt(1),
row.getInt(2)));
+ }
+ return result;
+ }
+
private void writeRows(List<GenericRow> rows) throws Exception {
writeRows(table, rows);
}
@@ -1079,6 +1211,14 @@ class ClusteringTableTest {
return result;
}
+ private List<DataFileMeta> dataFiles(Table targetTable) {
+ List<DataFileMeta> result = new ArrayList<>();
+ for (Split split :
targetTable.newReadBuilder().newScan().plan().splits()) {
+ result.addAll(((DataSplit) split).dataFiles());
+ }
+ return result;
+ }
+
private int countFiles(Table targetTable, Predicate filter) {
ReadBuilder readBuilder = targetTable.newReadBuilder();
if (filter != null) {
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d5599410ad..8bb557e127 100644
---
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -28,6 +28,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for schema changes. */
public class SchemaChangeITCase extends CatalogITCaseBase {
+
@Test
public void testSetAndRemoveOption() throws Exception {
sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 7e7c4528b4..019bf6484b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -377,8 +377,7 @@ public class DataEvolutionMergeIntoAction extends
TableActionBase {
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompressOptions(),
sinkParallelism,
- coreOptions.writeBufferSpillDiskSize(),
-
coreOptions.sequenceFieldSortOrderIsAscending()))
+
coreOptions.writeBufferSpillDiskSize()))
.setParallelism(sinkParallelism);
// 2. write partial columns
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index db2230eda6..25d43ddbc6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -286,8 +286,7 @@ public class IncrementalClusterCompact {
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
- options.writeBufferSpillDiskSize(),
-
options.sequenceFieldSortOrderIsAscending()))
+ options.writeBufferSpillDiskSize()))
.setParallelism(sinkParallelism);
// Step 3: remove the prepended key columns and convert back to RowData
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index b6847125fb..dfa1e432a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -49,7 +49,6 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
private final CompressOptions spillCompression;
private final int sinkParallelism;
private final MemorySize maxDiskSize;
- private final boolean sequenceOrder;
private transient BinaryExternalSortBuffer buffer;
private transient IOManager ioManager;
@@ -62,8 +61,7 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
int spillSortMaxNumFiles,
CompressOptions spillCompression,
int sinkParallelism,
- MemorySize maxDiskSize,
- boolean sequenceOrder) {
+ MemorySize maxDiskSize) {
this.keyType = keyType;
this.rowType = rowType;
this.maxMemory = maxMemory;
@@ -73,7 +71,6 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
this.spillCompression = spillCompression;
this.sinkParallelism = sinkParallelism;
this.maxDiskSize = maxDiskSize;
- this.sequenceOrder = sequenceOrder;
}
@Override
@@ -105,8 +102,7 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
pageSize,
spillSortMaxNumFiles,
spillCompression,
- maxDiskSize,
- sequenceOrder);
+ maxDiskSize);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 43777566f6..c841afa3f5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -170,8 +170,7 @@ public class SortUtils {
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
sinkParallelism,
- options.writeBufferSpillDiskSize(),
-
options.sequenceFieldSortOrderIsAscending()))
+ options.writeBufferSpillDiskSize()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
index c74c1c2c17..155e259e02 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
@@ -65,8 +65,7 @@ public class SortOperatorTest {
128,
CompressOptions.defaultOptions(),
1,
- MemorySize.MAX_VALUE,
- true) {};
+ MemorySize.MAX_VALUE) {};
OneInputStreamOperatorTestHarness harness =
createTestHarness(sortOperator);
harness.open();
@@ -115,8 +114,7 @@ public class SortOperatorTest {
128,
CompressOptions.defaultOptions(),
1,
- MemorySize.MAX_VALUE,
- true) {};
+ MemorySize.MAX_VALUE) {};
OneInputStreamOperatorTestHarness harness =
createTestHarness(sortOperator);
harness.open();
File[] files =
harness.getEnvironment().getIOManager().getSpillingDirectories();