This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d96796d37f [core] Add clustering table behavior tests
d96796d37f is described below

commit d96796d37ffca4fa82bff567bb39e32dcbb0dc78
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 19:05:45 2026 +0800

    [core] Add clustering table behavior tests
---
 .../paimon/separated/ClusteringTableTest.java      | 273 +++++++++++++++++++++
 1 file changed, 273 insertions(+)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index c988ad23b4..eaf48f79c8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.CloseableIterator;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -114,6 +115,22 @@ class ClusteringTableTest {
                         GenericRow.of(4, 44));
     }
 
+    /** Test duplicate primary keys in one commit are merged by write order. */
+    @Test
+    public void testSameKeysMultipleTimesInSingleCommit() throws Exception {
+        writeRows(
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(1, 30),
+                        GenericRow.of(3, 15),
+                        GenericRow.of(2, 25)));
+
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 30), GenericRow.of(2, 25), 
GenericRow.of(3, 15));
+    }
+
     @Test
     public void testNormal() throws Exception {
         List<GenericRow> input;
@@ -503,6 +520,76 @@ class ClusteringTableTest {
         assertThat(countFiles(table, pb.greaterThan(1, 
5))).isEqualTo(totalFiles);
     }
 
+    /** Test primary key filters do not skip files ranged by clustering 
columns. */
+    @Test
+    public void testPrimaryKeyFilterDoesNotSkipClusteringRanges() throws 
Exception {
+        writeRows(Arrays.asList(GenericRow.of(1, 1000), GenericRow.of(2, 10)));
+        writeRows(Arrays.asList(GenericRow.of(3, 500), GenericRow.of(4, 20)));
+
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+        assertThat(readRows(table, pb.equal(0, 2))).contains(GenericRow.of(2, 
10));
+        assertThat(readRows(table, pb.equal(0, 4))).contains(GenericRow.of(4, 
20));
+    }
+
+    /** Test an update moving a key to a different clustering range does not 
leak the old value. */
+    @Test
+    public void testUpdateAcrossClusteringRangesWithFilter() throws Exception {
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+        writeRows(Arrays.asList(GenericRow.of(1, 1000)));
+
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 1000), 
GenericRow.of(2, 20));
+        assertThat(readRows(table, pb.equal(1, 
10))).doesNotContain(GenericRow.of(1, 10));
+        assertThat(readRows(table, pb.lessThan(1, 
50))).contains(GenericRow.of(2, 20));
+        assertThat(readRows(table, pb.greaterThan(1, 
500))).containsExactly(GenericRow.of(1, 1000));
+    }
+
+    /** Test delete and reinsert when the new value belongs to a different 
clustering range. */
+    @Test
+    public void testDeleteAndReinsertAcrossClusteringRanges() throws Exception 
{
+        writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+        writeRows(Arrays.asList(GenericRow.ofKind(RowKind.DELETE, 1, 10)));
+
+        assertThat(readRows()).containsExactly(GenericRow.of(2, 20));
+
+        writeRows(Arrays.asList(GenericRow.of(1, 1000)));
+
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+        assertThat(readRows())
+                .containsExactlyInAnyOrder(GenericRow.of(1, 1000), 
GenericRow.of(2, 20));
+        assertThat(readRows(table, pb.equal(1, 10))).isEmpty();
+        assertThat(readRows(table, pb.equal(1, 
1000))).containsExactly(GenericRow.of(1, 1000));
+    }
+
+    /** Test composite primary key clustering table keeps latest values in 
deduplicate mode. */
+    @Test
+    public void testCompositePkDeduplicateKeepsLatestValues() throws Exception 
{
+        Table compositePkTable = createTableCompositePk();
+
+        writeRows(
+                compositePkTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("same"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("same"), 2, 20),
+                        GenericRow.of(BinaryString.fromString("other"), 1, 
30)));
+        writeRows(
+                compositePkTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("same"), 1, 100),
+                        GenericRow.of(BinaryString.fromString("same"), 2, 200),
+                        GenericRow.of(BinaryString.fromString("other"), 1, 
300)));
+
+        assertThat(readRowsCompositePk(compositePkTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(BinaryString.fromString("same"), 1, 100),
+                        GenericRow.of(BinaryString.fromString("same"), 2, 200),
+                        GenericRow.of(BinaryString.fromString("other"), 1, 
300));
+    }
+
     // ==================== First-Row Mode Tests ====================
 
     /** Test first-row mode keeps the first record when same key is written 
multiple times. */
@@ -537,6 +624,25 @@ class ClusteringTableTest {
                 .containsExactlyInAnyOrder(GenericRow.of(1, 100), 
GenericRow.of(2, 200));
     }
 
+    /** Test first-row mode keeps the first record for duplicate keys in one 
commit. */
+    @Test
+    public void testFirstRowSameKeysMultipleTimesInSingleCommit() throws 
Exception {
+        Table firstRowTable = createFirstRowTable();
+
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(1, 10),
+                        GenericRow.of(2, 20),
+                        GenericRow.of(1, 30),
+                        GenericRow.of(3, 15),
+                        GenericRow.of(2, 25)));
+
+        assertThat(readRows(firstRowTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 10), GenericRow.of(2, 20), 
GenericRow.of(3, 15));
+    }
+
     /** Test first-row mode with multiple commits. */
     @Test
     public void testFirstRowMultipleCommits() throws Exception {
@@ -748,6 +854,36 @@ class ClusteringTableTest {
                         GenericRow.of(BinaryString.fromString("hi"), 2, 20));
     }
 
+    /** Test bootstrap sorting with out-of-order composite primary keys in 
first-row mode. */
+    @Test
+    public void testFirstRowCompositePkOutOfOrderKeysAcrossBootstrap() throws 
Exception {
+        Table firstRowTable = 
createFirstRowTableCompositePkWithLowSpillThreshold();
+
+        List<GenericRow> originalRows =
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("z"), 2, 20),
+                        GenericRow.of(BinaryString.fromString("a"), 3, 30),
+                        GenericRow.of(BinaryString.fromString("a"), 1, 10),
+                        GenericRow.of(BinaryString.fromString("z"), 1, 15));
+        writeRows(firstRowTable, originalRows);
+
+        writeRows(
+                firstRowTable,
+                Arrays.asList(
+                        GenericRow.of(BinaryString.fromString("z"), 1, 150),
+                        GenericRow.of(BinaryString.fromString("a"), 1, 100),
+                        GenericRow.of(BinaryString.fromString("a"), 3, 300),
+                        GenericRow.of(BinaryString.fromString("b"), 1, 40),
+                        GenericRow.of(BinaryString.fromString("z"), 2, 200)));
+
+        List<GenericRow> expected = new ArrayList<>(originalRows);
+        expected.add(GenericRow.of(BinaryString.fromString("b"), 1, 40));
+        assertThat(readRowsCompositePk(firstRowTable))
+                .containsExactlyInAnyOrderElementsOf(expected);
+        
assertThat(dataFiles(firstRowTable).stream().mapToLong(DataFileMeta::rowCount).sum())
+                .isEqualTo(expected.size());
+    }
+
     /** Test first-row mode keeps original values after bootstrapping 
composite key index. */
     @Test
     public void testFirstRowCompositePkKeepsOriginalValuesAcrossBootstrap() 
throws Exception {
@@ -791,6 +927,59 @@ class ClusteringTableTest {
         assertThat(dataFile.maxKey().getInt(1)).isEqualTo(2);
     }
 
+    /** Test sort-and-rewrite writes composite clustering ranges by leading 
string column. */
+    @Test
+    public void testCompositeClusteringLeadingStringRangeOrder() throws 
Exception {
+        Table compositeClusteringTable = 
createTableWithCompositeClusteringColumns();
+
+        writeRows(
+                compositeClusteringTable,
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("z"), 1),
+                        GenericRow.of(2, BinaryString.fromString("a"), 2),
+                        GenericRow.of(3, BinaryString.fromString("a"), 1),
+                        GenericRow.of(4, BinaryString.fromString("m"), 9)));
+
+        List<DataFileMeta> dataFiles = dataFiles(compositeClusteringTable);
+        assertThat(dataFiles).hasSize(1);
+        DataFileMeta dataFile = dataFiles.get(0);
+        
assertThat(dataFile.minKey().getString(0)).isEqualTo(BinaryString.fromString("a"));
+        assertThat(dataFile.minKey().getInt(1)).isEqualTo(1);
+        
assertThat(dataFile.maxKey().getString(0)).isEqualTo(BinaryString.fromString("z"));
+        assertThat(dataFile.maxKey().getInt(1)).isEqualTo(1);
+    }
+
+    /** Test deduplicate mode with composite clustering columns and 
cross-range updates. */
+    @Test
+    public void testCompositeClusteringColumnsDeduplicateMode() throws 
Exception {
+        Table compositeClusteringTable = 
createTableWithCompositeClusteringColumns();
+
+        writeRows(
+                compositeClusteringTable,
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("z"), 20),
+                        GenericRow.of(2, BinaryString.fromString("a"), 10)));
+        writeRows(
+                compositeClusteringTable,
+                Arrays.asList(
+                        GenericRow.of(1, BinaryString.fromString("a"), 5),
+                        GenericRow.of(2, BinaryString.fromString("z"), 30),
+                        GenericRow.of(3, BinaryString.fromString("m"), 15)));
+
+        assertThat(readRowsWithStringClustering(compositeClusteringTable))
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, BinaryString.fromString("a"), 5),
+                        GenericRow.of(2, BinaryString.fromString("z"), 30),
+                        GenericRow.of(3, BinaryString.fromString("m"), 15));
+
+        PredicateBuilder pb = new 
PredicateBuilder(compositeClusteringTable.rowType());
+
+        assertThat(readRowsWithStringClustering(compositeClusteringTable, 
pb.lessOrEqual(2, 15)))
+                .contains(GenericRow.of(1, BinaryString.fromString("a"), 5));
+        assertThat(readRowsWithStringClustering(compositeClusteringTable, 
pb.greaterThan(2, 20)))
+                .contains(GenericRow.of(2, BinaryString.fromString("z"), 30));
+    }
+
     // ==================== Spill Tests ====================
 
     /** Test first-row mode with spill: keeps first values despite many 
duplicate commits. */
@@ -1123,6 +1312,42 @@ class ClusteringTableTest {
         return catalog.getTable(identifier);
     }
 
+    private Table createTableCompositePk() throws Exception {
+        Identifier identifier = Identifier.create("default", 
"composite_pk_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k1", DataTypes.STRING())
+                        .column("k2", DataTypes.INT())
+                        .column("v", DataTypes.INT())
+                        .primaryKey("k1", "k2")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "v")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    private Table createFirstRowTableCompositePkWithLowSpillThreshold() throws 
Exception {
+        Identifier identifier = Identifier.create("default", 
"first_row_composite_pk_spill_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("k1", DataTypes.STRING())
+                        .column("k2", DataTypes.INT())
+                        .column("v", DataTypes.INT())
+                        .primaryKey("k1", "k2")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "v")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .option(MERGE_ENGINE.key(), "first-row")
+                        .option(SORT_SPILL_THRESHOLD.key(), "2")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
     private Table createFirstRowTableCompositePk() throws Exception {
         Identifier identifier = Identifier.create("default", 
"first_row_composite_pk_table");
         Schema schema =
@@ -1160,6 +1385,23 @@ class ClusteringTableTest {
         return catalog.getTable(identifier);
     }
 
+    private Table createTableWithCompositeClusteringColumns() throws Exception 
{
+        Identifier identifier = Identifier.create("default", 
"composite_clustering_table");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("c", DataTypes.STRING())
+                        .column("b", DataTypes.INT())
+                        .primaryKey("a")
+                        .option(DELETION_VECTORS_ENABLED.key(), "true")
+                        .option(BUCKET.key(), "1")
+                        .option(CLUSTERING_COLUMNS.key(), "c,b")
+                        .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
     private List<GenericRow> readRowsCompositePk(Table targetTable) throws 
Exception {
         ReadBuilder readBuilder = targetTable.newReadBuilder();
         @SuppressWarnings("resource")
@@ -1196,7 +1438,14 @@ class ClusteringTableTest {
     }
 
     private List<GenericRow> readRows(Table targetTable) throws Exception {
+        return readRows(targetTable, null);
+    }
+
+    private List<GenericRow> readRows(Table targetTable, Predicate filter) 
throws Exception {
         ReadBuilder readBuilder = targetTable.newReadBuilder();
+        if (filter != null) {
+            readBuilder.withFilter(filter);
+        }
         @SuppressWarnings("resource")
         CloseableIterator<InternalRow> iterator =
                 readBuilder
@@ -1211,6 +1460,30 @@ class ClusteringTableTest {
         return result;
     }
 
+    private List<GenericRow> readRowsWithStringClustering(Table targetTable) 
throws Exception {
+        return readRowsWithStringClustering(targetTable, null);
+    }
+
+    private List<GenericRow> readRowsWithStringClustering(Table targetTable, 
Predicate filter)
+            throws Exception {
+        ReadBuilder readBuilder = targetTable.newReadBuilder();
+        if (filter != null) {
+            readBuilder.withFilter(filter);
+        }
+        @SuppressWarnings("resource")
+        CloseableIterator<InternalRow> iterator =
+                readBuilder
+                        .newRead()
+                        .createReader(readBuilder.newScan().plan())
+                        .toCloseableIterator();
+        List<GenericRow> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            InternalRow row = iterator.next();
+            result.add(GenericRow.of(row.getInt(0), row.getString(1), 
row.getInt(2)));
+        }
+        return result;
+    }
+
     private List<DataFileMeta> dataFiles(Table targetTable) {
         List<DataFileMeta> result = new ArrayList<>();
         for (Split split : 
targetTable.newReadBuilder().newScan().plan().splits()) {

Reply via email to