This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 06e9e05bbe [core] Fix bug of bootstrap in ClusteringKeyIndex (#7795)
06e9e05bbe is described below

commit 06e9e05bbee39d36b1c9e6dcebd67b234ae84bad
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat May 9 17:48:30 2026 +0800

    [core] Fix bug of bootstrap in ClusteringKeyIndex (#7795)
---
 .../paimon/lookup/sort/db/SimpleLsmKvDb.java       |  63 +++++++---
 .../paimon/lookup/sort/db/SimpleLsmKvDbTest.java   |  75 +++++++++++
 .../org/apache/paimon/append/cluster/Sorter.java   |   4 +-
 .../org/apache/paimon/codegen/CodeGenUtils.java    |  23 ++--
 .../paimon/crosspartition/GlobalIndexAssigner.java |   3 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java |   3 +-
 .../apache/paimon/lookup/rocksdb/RocksDBState.java |   3 +-
 .../clustering/ClusteringCompactManager.java       |   5 +-
 .../compact/clustering/ClusteringFileRewriter.java |   3 +-
 .../compact/clustering/ClusteringKeyIndex.java     |   3 +-
 .../paimon/sort/BinaryExternalSortBuffer.java      |  28 +----
 .../apache/paimon/utils/KeyComparatorSupplier.java |   2 +-
 .../paimon/utils/UserDefinedSeqComparator.java     |   6 +-
 .../apache/paimon/codegen/CodeGenUtilsTest.java    |  18 +--
 .../paimon/separated/ClusteringTableTest.java      | 140 +++++++++++++++++++++
 .../apache/paimon/flink/SchemaChangeITCase.java    |   1 +
 .../flink/action/DataEvolutionMergeIntoAction.java |   3 +-
 .../flink/compact/IncrementalClusterCompact.java   |   3 +-
 .../apache/paimon/flink/sorter/SortOperator.java   |   8 +-
 .../org/apache/paimon/flink/sorter/SortUtils.java  |   3 +-
 .../paimon/flink/sorter/SortOperatorTest.java      |   6 +-
 21 files changed, 307 insertions(+), 96 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
index c5b8c53fd9..20d606a516 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -247,11 +247,13 @@ public class SimpleLsmKvDb implements Closeable {
 
         int targetLevel = MAX_LEVELS - 1;
         List<SstFileMetadata> targetLevelFiles = levels.get(targetLevel);
+        List<SstFileMetadata> bulkLoadFiles = new ArrayList<>();
 
         SortLookupStoreWriter currentWriter = null;
         File currentSstFile = null;
         MemorySlice currentFileMinKey = null;
         MemorySlice currentFileMaxKey = null;
+        MemorySlice previousFileMaxKey = null;
         long currentBatchSize = 0;
 
         try {
@@ -259,28 +261,30 @@ public class SimpleLsmKvDb implements Closeable {
                 Map.Entry<byte[], byte[]> entry = sortedEntries.next();
                 byte[] key = entry.getKey();
                 byte[] value = entry.getValue();
+                MemorySlice currentKey = MemorySlice.wrap(key);
 
                 if (currentWriter == null) {
                     currentSstFile = newSstFile();
                     currentWriter = storeFactory.createWriter(currentSstFile, 
null);
-                    currentFileMinKey = MemorySlice.wrap(key);
+                    currentFileMinKey = currentKey;
                     currentBatchSize = 0;
                 }
 
                 currentWriter.put(key, value);
-                currentFileMaxKey = MemorySlice.wrap(key);
+                currentFileMaxKey = currentKey;
                 currentBatchSize += key.length + value.length;
 
                 if (currentBatchSize >= maxSstFileSize) {
                     currentWriter.close();
-                    targetLevelFiles.add(
-                            new SstFileMetadata(
+                    currentWriter = null;
+                    previousFileMaxKey =
+                            addBulkLoadSstFile(
+                                    bulkLoadFiles,
                                     currentSstFile,
                                     currentFileMinKey,
                                     currentFileMaxKey,
-                                    0,
-                                    targetLevel));
-                    currentWriter = null;
+                                    previousFileMaxKey,
+                                    targetLevel);
                     currentSstFile = null;
                     currentFileMinKey = null;
                     currentFileMaxKey = null;
@@ -289,13 +293,14 @@ public class SimpleLsmKvDb implements Closeable {
 
             if (currentWriter != null) {
                 currentWriter.close();
-                targetLevelFiles.add(
-                        new SstFileMetadata(
-                                currentSstFile,
-                                currentFileMinKey,
-                                currentFileMaxKey,
-                                0,
-                                targetLevel));
+                currentWriter = null;
+                addBulkLoadSstFile(
+                        bulkLoadFiles,
+                        currentSstFile,
+                        currentFileMinKey,
+                        currentFileMaxKey,
+                        previousFileMaxKey,
+                        targetLevel);
             }
         } catch (IOException | RuntimeException e) {
             if (currentWriter != null) {
@@ -308,10 +313,34 @@ public class SimpleLsmKvDb implements Closeable {
             throw e;
         }
 
+        targetLevelFiles.addAll(bulkLoadFiles);
+
         LOG.info(
-                "Bulk-loaded {} SST files directly to level {}",
-                targetLevelFiles.size(),
-                targetLevel);
+                "Bulk-loaded {} SST files directly to level {}", 
bulkLoadFiles.size(), targetLevel);
+    }
+
+    private MemorySlice addBulkLoadSstFile(
+            List<SstFileMetadata> targetLevelFiles,
+            File currentSstFile,
+            MemorySlice currentFileMinKey,
+            MemorySlice currentFileMaxKey,
+            @Nullable MemorySlice previousFileMaxKey,
+            int targetLevel) {
+        if (keyComparator.compare(currentFileMinKey, currentFileMaxKey) > 0) {
+            throw new IllegalArgumentException(
+                    "bulkLoad requires entries sorted by the configured key 
comparator; "
+                            + "generated SST min key is greater than max 
key.");
+        }
+        if (previousFileMaxKey != null
+                && keyComparator.compare(previousFileMaxKey, 
currentFileMinKey) > 0) {
+            throw new IllegalArgumentException(
+                    "bulkLoad requires entries sorted by the configured key 
comparator; "
+                            + "generated SST key ranges are not ordered.");
+        }
+        targetLevelFiles.add(
+                new SstFileMetadata(
+                        currentSstFile, currentFileMinKey, currentFileMaxKey, 
0, targetLevel));
+        return currentFileMaxKey;
     }
 
     // 
-------------------------------------------------------------------------
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
index 92a5488591..9ac869a318 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -1406,6 +1406,77 @@ public class SimpleLsmKvDbTest {
         }
     }
 
+    @Test
+    public void testBulkLoadFailsOnUnsortedEntries() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            entries.add(entry("key-00002", "value-2"));
+            entries.add(entry("key-00001", "value-1"));
+
+            IllegalArgumentException exception =
+                    Assertions.assertThrows(
+                            IllegalArgumentException.class, () -> 
db.bulkLoad(entries.iterator()));
+            Assertions.assertTrue(exception.getMessage().contains("sorted"));
+            Assertions.assertEquals(0, db.getSstFileCount());
+        }
+    }
+
+    @Test
+    public void testBulkLoadFailsOnUnorderedSstRanges() throws IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"bulk-unordered-ranges-db"))
+                        .memTableFlushThreshold(1024)
+                        .maxSstFileSize(1)
+                        .blockSize(256)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            entries.add(entry("key-00001", "value-1"));
+            entries.add(entry("key-00003", "value-3"));
+            entries.add(entry("key-00002", "value-2"));
+
+            IllegalArgumentException exception =
+                    Assertions.assertThrows(
+                            IllegalArgumentException.class, () -> 
db.bulkLoad(entries.iterator()));
+            Assertions.assertTrue(exception.getMessage().contains("ranges"));
+            Assertions.assertEquals(0, db.getSstFileCount());
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testBulkLoadOrderCheckUsesConfiguredComparator() throws 
IOException {
+        Comparator<MemorySlice> reverseComparator =
+                new Comparator<MemorySlice>() {
+                    @Override
+                    public int compare(MemorySlice a, MemorySlice b) {
+                        return b.compareTo(a);
+                    }
+                };
+
+        try (SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"bulk-reverse-db"))
+                        .memTableFlushThreshold(1024)
+                        .blockSize(256)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .keyComparator(reverseComparator)
+                        .build()) {
+            List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+            entries.add(entry("key-a", "value-a"));
+            entries.add(entry("key-b", "value-b"));
+
+            IllegalArgumentException exception =
+                    Assertions.assertThrows(
+                            IllegalArgumentException.class, () -> 
db.bulkLoad(entries.iterator()));
+            
Assertions.assertTrue(exception.getMessage().contains("comparator"));
+        }
+    }
+
     @Test
     public void testBulkLoadThenPutAndGet() throws IOException {
         try (SimpleLsmKvDb db = createDb()) {
@@ -1500,6 +1571,10 @@ public class SimpleLsmKvDbTest {
         db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
     }
 
+    private static Map.Entry<byte[], byte[]> entry(String key, String value) {
+        return new AbstractMap.SimpleImmutableEntry<>(key.getBytes(UTF_8), 
value.getBytes(UTF_8));
+    }
+
     private static String getString(SimpleLsmKvDb db, String key) throws 
IOException {
         byte[] bytes = db.get(key.getBytes(UTF_8));
         if (bytes == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
index 60dbecdde0..41786992de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
@@ -76,7 +76,6 @@ public abstract class Sorter {
         int spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
         CompressOptions spillCompression = options.spillCompressOptions();
         MemorySize maxDiskSize = options.writeBufferSpillDiskSize();
-        boolean sequenceOrder = options.sequenceFieldSortOrderIsAscending();
 
         this.ioManager = ioManager;
         this.buffer =
@@ -88,8 +87,7 @@ public abstract class Sorter {
                         pageSize,
                         spillSortMaxNumFiles,
                         spillCompression,
-                        maxDiskSize,
-                        sequenceOrder);
+                        maxDiskSize);
     }
 
     public abstract InternalRow assignSortKey(InternalRow row);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
index ec6887960c..a88a9e6be1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/codegen/CodeGenUtils.java
@@ -83,21 +83,20 @@ public class CodeGenUtils {
     }
 
     public static RecordComparator newRecordComparator(List<DataType> 
inputTypes) {
-        return newRecordComparator(
-                inputTypes, IntStream.range(0, inputTypes.size()).toArray(), 
true);
+        int[] sortFields = new int[inputTypes.size()];
+        for (int i = 0; i < sortFields.length; i++) {
+            sortFields[i] = i;
+        }
+        return newRecordComparator(inputTypes, sortFields);
     }
 
     public static RecordComparator newRecordComparator(
-            List<DataType> inputTypes, int[] sortFields, boolean 
isAscendingOrder) {
-        return generate(
-                RecordComparator.class,
-                inputTypes,
-                sortFields,
-                isAscendingOrder,
-                () ->
-                        getCodeGenerator()
-                                .generateRecordComparator(
-                                        inputTypes, sortFields, 
isAscendingOrder));
+            List<DataType> inputTypes, int[] sortFields) {
+        boolean[] ascendingOrders = new boolean[sortFields.length];
+        for (int i = 0; i < sortFields.length; i++) {
+            ascendingOrders[i] = true;
+        }
+        return newRecordComparator(inputTypes, sortFields, ascendingOrders);
     }
 
     public static RecordComparator newRecordComparator(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 9097c5c482..12f90cd447 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -307,8 +307,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         coreOptions.pageSize(),
                         coreOptions.localSortMaxNumFileHandles(),
                         coreOptions.spillCompressOptions(),
-                        coreOptions.writeBufferSpillDiskSize(),
-                        coreOptions.sequenceFieldSortOrderIsAscending());
+                        coreOptions.writeBufferSpillDiskSize());
 
         Function<SortOrder, RowIterator> iteratorFunction =
                 sortOrder -> {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index cfeaa99918..577a04642f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -218,8 +218,7 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                         options.pageSize(),
                         options.localSortMaxNumFileHandles(),
                         options.spillCompressOptions(),
-                        options.writeBufferSpillDiskSize(),
-                        options.sequenceFieldSortOrderIsAscending());
+                        options.writeBufferSpillDiskSize());
 
         List<Split> splitList = Collections.singletonList(split);
         RecordReader<InternalRow> reader =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
index 62257cfd34..f564fba0d3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
@@ -128,8 +128,7 @@ public abstract class RocksDBState<K, V, CacheV> implements 
State<K, V> {
                 options.pageSize(),
                 options.localSortMaxNumFileHandles(),
                 options.spillCompressOptions(),
-                options.writeBufferSpillDiskSize(),
-                options.sequenceFieldSortOrderIsAscending());
+                options.writeBufferSpillDiskSize());
     }
 
     /** A class wraps byte[] to indicate contain or not contain. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 61813513d1..5f905d9b25 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -112,11 +112,10 @@ public class ClusteringCompactManager extends 
CompactFutureManager {
         RecordComparator clusteringComparatorAlone =
                 CodeGenUtils.newRecordComparator(
                         valueType.project(clusteringColumns).getFieldTypes(),
-                        IntStream.range(0, clusteringColumns.size()).toArray(),
-                        true);
+                        IntStream.range(0, 
clusteringColumns.size()).toArray());
         RecordComparator clusteringComparatorInValue =
                 CodeGenUtils.newRecordComparator(
-                        valueType.getFieldTypes(), clusteringColumnIndexes, 
true);
+                        valueType.getFieldTypes(), clusteringColumnIndexes);
 
         SimpleLsmKvDb kvDb =
                 SimpleLsmKvDb.builder(new File(ioManager.pickTempDir()))
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 7788d4ced9..451d7c7500 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -136,8 +136,7 @@ public class ClusteringFileRewriter {
                         pageSize,
                         maxNumFileHandles,
                         compression,
-                        MemorySize.MAX_VALUE,
-                        false);
+                        MemorySize.MAX_VALUE);
 
         try (RecordReader<KeyValue> reader = 
valueReaderFactory.createRecordReader(inputFile)) {
             try (CloseableIterator<KeyValue> iterator = 
reader.toCloseableIterator()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
index fee1abae2b..7855bfacbb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -123,8 +123,7 @@ public class ClusteringKeyIndex implements Closeable {
                         pageSize,
                         maxNumFileHandles,
                         compression,
-                        MemorySize.MAX_VALUE,
-                        false);
+                        MemorySize.MAX_VALUE);
 
         RowCompactedSerializer keySerializer = new 
RowCompactedSerializer(keyType);
         InternalRow.FieldGetter[] keyFieldGetters =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index c6495811de..31963420e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -32,7 +32,6 @@ import org.apache.paimon.disk.FileChannelUtil;
 import org.apache.paimon.disk.FileIOChannel;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
-import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -101,30 +100,9 @@ public class BinaryExternalSortBuffer implements 
SortBuffer {
             int pageSize,
             int maxNumFileHandles,
             CompressOptions compression,
-            MemorySize maxDiskSize,
-            boolean sequenceOrder) {
-        return create(
-                ioManager,
-                rowType,
-                keyFields,
-                new HeapMemorySegmentPool(bufferSize, pageSize),
-                maxNumFileHandles,
-                compression,
-                maxDiskSize,
-                sequenceOrder);
-    }
-
-    public static BinaryExternalSortBuffer create(
-            IOManager ioManager,
-            RowType rowType,
-            int[] keyFields,
-            MemorySegmentPool pool,
-            int maxNumFileHandles,
-            CompressOptions compression,
-            MemorySize maxDiskSize,
-            boolean sequenceOrder) {
-        RecordComparator comparator =
-                newRecordComparator(rowType.getFieldTypes(), keyFields, 
sequenceOrder);
+            MemorySize maxDiskSize) {
+        RecordComparator comparator = 
newRecordComparator(rowType.getFieldTypes(), keyFields);
+        HeapMemorySegmentPool pool = new HeapMemorySegmentPool(bufferSize, 
pageSize);
         BinaryInMemorySortBuffer sortBuffer =
                 BinaryInMemorySortBuffer.createBuffer(
                         newNormalizedKeyComputer(rowType.getFieldTypes(), 
keyFields),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
index 25fd07b7ca..6f90cef01a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyComparatorSupplier.java
@@ -45,6 +45,6 @@ public class KeyComparatorSupplier implements 
SerializableSupplier<Comparator<In
 
     @Override
     public RecordComparator get() {
-        return newRecordComparator(inputTypes, sortFields, true);
+        return newRecordComparator(inputTypes, sortFields);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
index 7326a13509..144c98809e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/UserDefinedSeqComparator.java
@@ -83,9 +83,13 @@ public class UserDefinedSeqComparator implements 
FieldsComparator {
             return null;
         }
 
+        boolean[] ascendingOrders = new boolean[sequenceFields.length];
+        for (int i = 0; i < sequenceFields.length; i++) {
+            ascendingOrders[i] = isAscendingOrder;
+        }
         RecordComparator comparator =
                 CodeGenUtils.newRecordComparator(
-                        rowType.getFieldTypes(), sequenceFields, 
isAscendingOrder);
+                        rowType.getFieldTypes(), sequenceFields, 
ascendingOrders);
         return new UserDefinedSeqComparator(sequenceFields, comparator, 
isAscendingOrder);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
index 2d98596a28..b6cab84413 100644
--- a/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/codegen/CodeGenUtilsTest.java
@@ -85,7 +85,7 @@ class CodeGenUtilsTest {
     @Test
     public void testRecordComparatorCodegenCache() {
         assertClassEquals(
-                () -> newRecordComparator(Arrays.asList(STRING(), INT()), new 
int[] {0, 1}, true));
+                () -> newRecordComparator(Arrays.asList(STRING(), INT()), new 
int[] {0, 1}));
     }
 
     @Test
@@ -93,23 +93,25 @@ class CodeGenUtilsTest {
         assertClassEquals(
                 () ->
                         newRecordComparator(
-                                Arrays.asList(STRING(), VECTOR(3, INT())), new 
int[] {0, 1}, true));
+                                Arrays.asList(STRING(), VECTOR(3, INT())), new 
int[] {0, 1}));
     }
 
     @Test
     public void testRecordComparatorCodegenCacheMiss() {
         assertClassNotEquals(
-                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}, true),
-                newRecordComparator(
-                        Arrays.asList(STRING(), INT(), DOUBLE()), new int[] 
{0, 1, 2}, true));
+                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}),
+                newRecordComparator(Arrays.asList(STRING(), INT(), DOUBLE()), 
new int[] {0, 1, 2}));
     }
 
     @Test
     public void testRecordComparatorOrderCacheMiss() {
         RecordComparator ascending =
-                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}, true);
+                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1});
         RecordComparator descending =
-                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}, false);
+                newRecordComparator(
+                        Arrays.asList(STRING(), INT()),
+                        new int[] {0, 1},
+                        new boolean[] {false, false});
 
         InternalRow row1 = GenericRow.of(BinaryString.fromString("a"), 1);
         InternalRow row2 = GenericRow.of(BinaryString.fromString("b"), 1);
@@ -134,7 +136,7 @@ class CodeGenUtilsTest {
     @Test
     public void testHybridNotEqual() {
         assertClassNotEquals(
-                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}, true),
+                newRecordComparator(Arrays.asList(STRING(), INT()), new int[] 
{0, 1}),
                 newNormalizedKeyComputer(Arrays.asList(STRING(), INT()), new 
int[] {0, 1}));
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index d1a7f070cd..c988ad23b4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -712,6 +713,84 @@ class ClusteringTableTest {
         
assertThat(readRows(firstRowTable)).containsExactlyInAnyOrderElementsOf(firstBatch);
     }
 
+    /**
+     * Test first-row mode with composite primary key (STRING, INT) and same 
values inserted twice.
+     * Reproduces an issue where duplicate detection fails or produces wrong 
results.
+     */
+    @Test
+    public void testFirstRowCompositePkSameValues() throws Exception {
+        Table firstRowTable = createFirstRowTableCompositePk();
+
+        // Commit 1: initial records
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("hi"), 2, 20)));
+
+        // Verify first commit
+        assertThat(readRowsCompositePk(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("hi"), 2, 20));
+
+        // Commit 2: same keys with same values again — all should be ignored 
in first-row mode
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("hi"), 2, 20)));
+
+        // Should still see the first values
+        assertThat(readRowsCompositePk(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(BinaryString.fromString("hi"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("hi"), 2, 20));
+    }
+
+    /** Test first-row mode keeps original values after bootstrapping 
composite key index. */
+    @Test
+    public void testFirstRowCompositePkKeepsOriginalValuesAcrossBootstrap() 
throws Exception {
+        Table firstRowTable = createFirstRowTableCompositePk();
+
+        List<GenericRow> originalRows =
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("same"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("same"), 2, 20),
+                        GenericRow.of(BinaryString.fromString("same"), 3, 30));
+        writeRows(firstRowTable, originalRows);
+
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("same"), 1, 100),
+                        GenericRow.of(BinaryString.fromString("same"), 2, 200),
+                        GenericRow.of(BinaryString.fromString("same"), 3, 
300)));
+
+        assertThat(readRowsCompositePk(firstRowTable))
+                .containsExactlyInAnyOrderElementsOf(originalRows);
+        
assertThat(dataFiles(firstRowTable).stream().mapToLong(DataFileMeta::rowCount).sum())
+                .isEqualTo(originalRows.size());
+    }
+
+    /** Test sort-and-rewrite writes clustering ranges in ascending order. */
+    @Test
+    public void testFirstRowSortAndRewriteFileKeepsAscendingClusteringRange() 
throws Exception {
+        Table firstRowTable = 
createFirstRowTableWithCompositeClusteringColumns();
+
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("same"), 2),
+                        GenericRow.of(2, BinaryString.fromString("same"), 1)));
+
+        List<DataFileMeta> dataFiles = dataFiles(firstRowTable);
+        assertThat(dataFiles).hasSize(1);
+        DataFileMeta dataFile = dataFiles.get(0);
+        assertThat(dataFile.minKey().getInt(1)).isEqualTo(1);
+        assertThat(dataFile.maxKey().getInt(1)).isEqualTo(2);
+    }
+
     // ==================== Spill Tests ====================
 
     /** Test first-row mode with spill: keeps first values despite many 
duplicate commits. */
@@ -1044,6 +1123,59 @@ class ClusteringTableTest {
         return catalog.getTable(identifier);
     }
 
+    private Table createFirstRowTableCompositePk() throws Exception {
+        Identifier identifier = Identifier.create("default", 
"first_row_composite_pk_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k1", DataTypes.STRING())
+                        .column("k2", DataTypes.INT())
+                        .column("v", DataTypes.INT())
+                        .primaryKey("k1", "k2")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "v")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private Table createFirstRowTableWithCompositeClusteringColumns() throws 
Exception {
+        Identifier identifier =
+                Identifier.create("default", 
"first_row_composite_clustering_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("c", DataTypes.STRING())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "c,b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private List<GenericRow> readRowsCompositePk(Table targetTable) throws 
Exception {
+        ReadBuilder readBuilder = targetTable.newReadBuilder();
+        @SuppressWarnings("resource")
+        CloseableIterator<InternalRow> iterator =
+                readBuilder
+                        .newRead()
+                        .createReader(readBuilder.newScan().plan())
+                        .toCloseableIterator();
+        List<GenericRow> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            InternalRow row = iterator.next();
+            result.add(GenericRow.of(row.getString(0), row.getInt(1), 
row.getInt(2)));
+        }
+        return result;
+    }
+
     private void writeRows(List<GenericRow> rows) throws Exception {
         writeRows(table, rows);
     }
@@ -1079,6 +1211,14 @@ class ClusteringTableTest {
         return result;
     }
 
+    private List<DataFileMeta> dataFiles(Table targetTable) {
+        List<DataFileMeta> result = new ArrayList<>();
+        for (Split split : 
targetTable.newReadBuilder().newScan().plan().splits()) {
+            result.addAll(((DataSplit) split).dataFiles());
+        }
+        return result;
+    }
+
     private int countFiles(Table targetTable, Predicate filter) {
         ReadBuilder readBuilder = targetTable.newReadBuilder();
         if (filter != null) {
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index d5599410ad..8bb557e127 100644
--- 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -28,6 +28,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
+
     @Test
     public void testSetAndRemoveOption() throws Exception {
         sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
index 7e7c4528b4..019bf6484b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -377,8 +377,7 @@ public class DataEvolutionMergeIntoAction extends 
TableActionBase {
                                         
coreOptions.localSortMaxNumFileHandles(),
                                         coreOptions.spillCompressOptions(),
                                         sinkParallelism,
-                                        coreOptions.writeBufferSpillDiskSize(),
-                                        
coreOptions.sequenceFieldSortOrderIsAscending()))
+                                        
coreOptions.writeBufferSpillDiskSize()))
                         .setParallelism(sinkParallelism);
 
         // 2. write partial columns
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index db2230eda6..25d43ddbc6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -286,8 +286,7 @@ public class IncrementalClusterCompact {
                                         options.localSortMaxNumFileHandles(),
                                         options.spillCompressOptions(),
                                         sinkParallelism,
-                                        options.writeBufferSpillDiskSize(),
-                                        
options.sequenceFieldSortOrderIsAscending()))
+                                        options.writeBufferSpillDiskSize()))
                         .setParallelism(sinkParallelism);
 
         // Step 3: remove the prepended key columns and convert back to RowData
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index b6847125fb..dfa1e432a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -49,7 +49,6 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     private final CompressOptions spillCompression;
     private final int sinkParallelism;
     private final MemorySize maxDiskSize;
-    private final boolean sequenceOrder;
 
     private transient BinaryExternalSortBuffer buffer;
     private transient IOManager ioManager;
@@ -62,8 +61,7 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
             int spillSortMaxNumFiles,
             CompressOptions spillCompression,
             int sinkParallelism,
-            MemorySize maxDiskSize,
-            boolean sequenceOrder) {
+            MemorySize maxDiskSize) {
         this.keyType = keyType;
         this.rowType = rowType;
         this.maxMemory = maxMemory;
@@ -73,7 +71,6 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
         this.spillCompression = spillCompression;
         this.sinkParallelism = sinkParallelism;
         this.maxDiskSize = maxDiskSize;
-        this.sequenceOrder = sequenceOrder;
     }
 
     @Override
@@ -105,8 +102,7 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
                         pageSize,
                         spillSortMaxNumFiles,
                         spillCompression,
-                        maxDiskSize,
-                        sequenceOrder);
+                        maxDiskSize);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 43777566f6..c841afa3f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -170,8 +170,7 @@ public class SortUtils {
                                     options.localSortMaxNumFileHandles(),
                                     options.spillCompressOptions(),
                                     sinkParallelism,
-                                    options.writeBufferSpillDiskSize(),
-                                    
options.sequenceFieldSortOrderIsAscending()))
+                                    options.writeBufferSpillDiskSize()))
                     .setParallelism(sinkParallelism)
                     // remove the key column from every row
                     .map(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
index c74c1c2c17..155e259e02 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
@@ -65,8 +65,7 @@ public class SortOperatorTest {
                         128,
                         CompressOptions.defaultOptions(),
                         1,
-                        MemorySize.MAX_VALUE,
-                        true) {};
+                        MemorySize.MAX_VALUE) {};
 
         OneInputStreamOperatorTestHarness harness = 
createTestHarness(sortOperator);
         harness.open();
@@ -115,8 +114,7 @@ public class SortOperatorTest {
                         128,
                         CompressOptions.defaultOptions(),
                         1,
-                        MemorySize.MAX_VALUE,
-                        true) {};
+                        MemorySize.MAX_VALUE) {};
         OneInputStreamOperatorTestHarness harness = 
createTestHarness(sortOperator);
         harness.open();
         File[] files = 
harness.getEnvironment().getIOManager().getSpillingDirectories();


Reply via email to