This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 455d6f395 [lake/paimon] use paimon PK row type as comparator  (#2980)
455d6f395 is described below

commit 455d6f3954b85776b2fbaf6e2fc84c3c3e308123
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Apr 2 13:56:53 2026 +0800

    [lake/paimon] use paimon PK row type as comparator  (#2980)
---
 .../paimon/source/PaimonSortedRecordReader.java    |  2 +-
 .../source/PaimonSortedRecordReaderTest.java       | 50 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 1940b2055..17220c854 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -49,7 +49,7 @@ public class PaimonSortedRecordReader extends 
PaimonRecordReader implements Sort
                         PrimaryKeyTableUtils.addKeyNamePrefix(
                                 fileStoreTable.schema().primaryKeysFields()));
         this.comparator =
-                toFlussRowComparator(paimonRowType, new 
KeyComparatorSupplier(pkKeyType).get());
+                toFlussRowComparator(pkKeyType, new 
KeyComparatorSupplier(pkKeyType).get());
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
index 21a6333fd..af16eac21 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.fluss.lake.source.SortedRecordReader;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.row.TimestampLtz;
@@ -38,6 +39,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataTypes;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -96,6 +98,54 @@ class PaimonSortedRecordReaderTest extends 
PaimonSourceTestBase {
         }
     }
 
+    @Test
+    void testComparatorUsesPrimaryKeyRowType() throws Exception {
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"pkTable_composite_timestamp_key");
+
+        createTable(
+                tablePath,
+                Schema.newBuilder()
+                        .column("member_id", DataTypes.BIGINT())
+                        .column("product_id", DataTypes.STRING())
+                        .column("channel_key", DataTypes.STRING())
+                        .column("product_name", DataTypes.STRING())
+                        .column("seq_time", DataTypes.TIMESTAMP(0))
+                        .column("order_id", DataTypes.STRING())
+                        .primaryKey("member_id", "channel_key", "seq_time", 
"order_id")
+                        .option(CoreOptions.BUCKET.key(), "1")
+                        .option(CoreOptions.BUCKET_KEY.key(), 
"member_id,channel_key,seq_time")
+                        .build());
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        RecordReader recordReader = lakeSource.createRecordReader(() -> null);
+        assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+
+        Comparator<InternalRow> comparator = ((SortedRecordReader) 
recordReader).order();
+        Table table = getTable(tablePath);
+        int[] pkIndex = table.rowType().getFieldIndices(table.primaryKeys());
+
+        GenericRow row1 = new GenericRow(6);
+        row1.setField(0, 1L);
+        row1.setField(1, 
org.apache.fluss.row.BinaryString.fromString("product-1"));
+        row1.setField(2, 
org.apache.fluss.row.BinaryString.fromString("channel-a"));
+        row1.setField(3, 
org.apache.fluss.row.BinaryString.fromString("name-1"));
+        row1.setField(4, TimestampNtz.fromMillis(1_700_000_000_000L));
+        row1.setField(5, 
org.apache.fluss.row.BinaryString.fromString("order-1"));
+
+        GenericRow row2 = new GenericRow(6);
+        row2.setField(0, 1L);
+        row2.setField(1, 
org.apache.fluss.row.BinaryString.fromString("product-2"));
+        row2.setField(2, 
org.apache.fluss.row.BinaryString.fromString("channel-a"));
+        row2.setField(3, 
org.apache.fluss.row.BinaryString.fromString("name-2"));
+        row2.setField(4, TimestampNtz.fromMillis(1_700_000_000_001L));
+        row2.setField(5, 
org.apache.fluss.row.BinaryString.fromString("order-2"));
+
+        InternalRow pkRow1 = ProjectedRow.from(pkIndex).replaceRow(row1);
+        InternalRow pkRow2 = ProjectedRow.from(pkIndex).replaceRow(row2);
+
+        assertThat(comparator.compare(pkRow1, pkRow2)).isLessThan(0);
+    }
+
     private static <T> boolean isSorted(Iterator<T> iterator, Comparator<? 
super T> comparator) {
         if (!iterator.hasNext()) {
             return true;

Reply via email to