This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d96796d37f [core] Add clustering table behavior tests
d96796d37f is described below
commit d96796d37ffca4fa82bff567bb39e32dcbb0dc78
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 19:05:45 2026 +0800
[core] Add clustering table behavior tests
---
.../paimon/separated/ClusteringTableTest.java | 273 +++++++++++++++++++++
1 file changed, 273 insertions(+)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index c988ad23b4..eaf48f79c8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
@@ -114,6 +115,22 @@ class ClusteringTableTest {
GenericRow.of(4, 44));
}
+ /** Test duplicate primary keys in one commit are merged by write order. */
+ @Test
+ public void testSameKeysMultipleTimesInSingleCommit() throws Exception {
+ writeRows(
+ Arrays.asList(
+ GenericRow.of(1, 10),
+ GenericRow.of(2, 20),
+ GenericRow.of(1, 30),
+ GenericRow.of(3, 15),
+ GenericRow.of(2, 25)));
+
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 30), GenericRow.of(2, 25),
GenericRow.of(3, 15));
+ }
+
@Test
public void testNormal() throws Exception {
List<GenericRow> input;
@@ -503,6 +520,76 @@ class ClusteringTableTest {
assertThat(countFiles(table, pb.greaterThan(1,
5))).isEqualTo(totalFiles);
}
+ /** Test primary key filters do not skip files ranged by clustering
columns. */
+ @Test
+ public void testPrimaryKeyFilterDoesNotSkipClusteringRanges() throws
Exception {
+ writeRows(Arrays.asList(GenericRow.of(1, 1000), GenericRow.of(2, 10)));
+ writeRows(Arrays.asList(GenericRow.of(3, 500), GenericRow.of(4, 20)));
+
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ assertThat(readRows(table, pb.equal(0, 2))).contains(GenericRow.of(2,
10));
+ assertThat(readRows(table, pb.equal(0, 4))).contains(GenericRow.of(4,
20));
+ }
+
+ /** Test an update moving a key to a different clustering range does not
leak the old value. */
+ @Test
+ public void testUpdateAcrossClusteringRangesWithFilter() throws Exception {
+ writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+ writeRows(Arrays.asList(GenericRow.of(1, 1000)));
+
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(GenericRow.of(1, 1000),
GenericRow.of(2, 20));
+ assertThat(readRows(table, pb.equal(1,
10))).doesNotContain(GenericRow.of(1, 10));
+ assertThat(readRows(table, pb.lessThan(1,
50))).contains(GenericRow.of(2, 20));
+ assertThat(readRows(table, pb.greaterThan(1,
500))).containsExactly(GenericRow.of(1, 1000));
+ }
+
+ /** Test delete and reinsert when the new value belongs to a different
clustering range. */
+ @Test
+ public void testDeleteAndReinsertAcrossClusteringRanges() throws Exception
{
+ writeRows(Arrays.asList(GenericRow.of(1, 10), GenericRow.of(2, 20)));
+ writeRows(Arrays.asList(GenericRow.ofKind(RowKind.DELETE, 1, 10)));
+
+ assertThat(readRows()).containsExactly(GenericRow.of(2, 20));
+
+ writeRows(Arrays.asList(GenericRow.of(1, 1000)));
+
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(GenericRow.of(1, 1000),
GenericRow.of(2, 20));
+ assertThat(readRows(table, pb.equal(1, 10))).isEmpty();
+ assertThat(readRows(table, pb.equal(1,
1000))).containsExactly(GenericRow.of(1, 1000));
+ }
+
+ /** Test composite primary key clustering table keeps latest values in
deduplicate mode. */
+ @Test
+ public void testCompositePkDeduplicateKeepsLatestValues() throws Exception
{
+ Table compositePkTable = createTableCompositePk();
+
+ writeRows(
+ compositePkTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("same"), 1, 10),
+ GenericRow.of(BinaryString.fromString("same"), 2, 20),
+ GenericRow.of(BinaryString.fromString("other"), 1,
30)));
+ writeRows(
+ compositePkTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("same"), 1, 100),
+ GenericRow.of(BinaryString.fromString("same"), 2, 200),
+ GenericRow.of(BinaryString.fromString("other"), 1,
300)));
+
+ assertThat(readRowsCompositePk(compositePkTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(BinaryString.fromString("same"), 1, 100),
+ GenericRow.of(BinaryString.fromString("same"), 2, 200),
+ GenericRow.of(BinaryString.fromString("other"), 1,
300));
+ }
+
// ==================== First-Row Mode Tests ====================
/** Test first-row mode keeps the first record when same key is written
multiple times. */
@@ -537,6 +624,25 @@ class ClusteringTableTest {
.containsExactlyInAnyOrder(GenericRow.of(1, 100),
GenericRow.of(2, 200));
}
+ /** Test first-row mode keeps the first record for duplicate keys in one
commit. */
+ @Test
+ public void testFirstRowSameKeysMultipleTimesInSingleCommit() throws
Exception {
+ Table firstRowTable = createFirstRowTable();
+
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(1, 10),
+ GenericRow.of(2, 20),
+ GenericRow.of(1, 30),
+ GenericRow.of(3, 15),
+ GenericRow.of(2, 25)));
+
+ assertThat(readRows(firstRowTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 10), GenericRow.of(2, 20),
GenericRow.of(3, 15));
+ }
+
/** Test first-row mode with multiple commits. */
@Test
public void testFirstRowMultipleCommits() throws Exception {
@@ -748,6 +854,36 @@ class ClusteringTableTest {
GenericRow.of(BinaryString.fromString("hi"), 2, 20));
}
+ /** Test bootstrap sorting with out-of-order composite primary keys in
first-row mode. */
+ @Test
+ public void testFirstRowCompositePkOutOfOrderKeysAcrossBootstrap() throws
Exception {
+ Table firstRowTable =
createFirstRowTableCompositePkWithLowSpillThreshold();
+
+ List<GenericRow> originalRows =
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("z"), 2, 20),
+ GenericRow.of(BinaryString.fromString("a"), 3, 30),
+ GenericRow.of(BinaryString.fromString("a"), 1, 10),
+ GenericRow.of(BinaryString.fromString("z"), 1, 15));
+ writeRows(firstRowTable, originalRows);
+
+ writeRows(
+ firstRowTable,
+ Arrays.asList(
+ GenericRow.of(BinaryString.fromString("z"), 1, 150),
+ GenericRow.of(BinaryString.fromString("a"), 1, 100),
+ GenericRow.of(BinaryString.fromString("a"), 3, 300),
+ GenericRow.of(BinaryString.fromString("b"), 1, 40),
+ GenericRow.of(BinaryString.fromString("z"), 2, 200)));
+
+ List<GenericRow> expected = new ArrayList<>(originalRows);
+ expected.add(GenericRow.of(BinaryString.fromString("b"), 1, 40));
+ assertThat(readRowsCompositePk(firstRowTable))
+ .containsExactlyInAnyOrderElementsOf(expected);
+
assertThat(dataFiles(firstRowTable).stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(expected.size());
+ }
+
/** Test first-row mode keeps original values after bootstrapping
composite key index. */
@Test
public void testFirstRowCompositePkKeepsOriginalValuesAcrossBootstrap()
throws Exception {
@@ -791,6 +927,59 @@ class ClusteringTableTest {
assertThat(dataFile.maxKey().getInt(1)).isEqualTo(2);
}
+ /** Test sort-and-rewrite writes composite clustering ranges by leading
string column. */
+ @Test
+ public void testCompositeClusteringLeadingStringRangeOrder() throws
Exception {
+ Table compositeClusteringTable =
createTableWithCompositeClusteringColumns();
+
+ writeRows(
+ compositeClusteringTable,
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("z"), 1),
+ GenericRow.of(2, BinaryString.fromString("a"), 2),
+ GenericRow.of(3, BinaryString.fromString("a"), 1),
+ GenericRow.of(4, BinaryString.fromString("m"), 9)));
+
+ List<DataFileMeta> dataFiles = dataFiles(compositeClusteringTable);
+ assertThat(dataFiles).hasSize(1);
+ DataFileMeta dataFile = dataFiles.get(0);
+
assertThat(dataFile.minKey().getString(0)).isEqualTo(BinaryString.fromString("a"));
+ assertThat(dataFile.minKey().getInt(1)).isEqualTo(1);
+
assertThat(dataFile.maxKey().getString(0)).isEqualTo(BinaryString.fromString("z"));
+ assertThat(dataFile.maxKey().getInt(1)).isEqualTo(1);
+ }
+
+ /** Test deduplicate mode with composite clustering columns and
cross-range updates. */
+ @Test
+ public void testCompositeClusteringColumnsDeduplicateMode() throws
Exception {
+ Table compositeClusteringTable =
createTableWithCompositeClusteringColumns();
+
+ writeRows(
+ compositeClusteringTable,
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("z"), 20),
+ GenericRow.of(2, BinaryString.fromString("a"), 10)));
+ writeRows(
+ compositeClusteringTable,
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("a"), 5),
+ GenericRow.of(2, BinaryString.fromString("z"), 30),
+ GenericRow.of(3, BinaryString.fromString("m"), 15)));
+
+ assertThat(readRowsWithStringClustering(compositeClusteringTable))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, BinaryString.fromString("a"), 5),
+ GenericRow.of(2, BinaryString.fromString("z"), 30),
+ GenericRow.of(3, BinaryString.fromString("m"), 15));
+
+ PredicateBuilder pb = new
PredicateBuilder(compositeClusteringTable.rowType());
+
+ assertThat(readRowsWithStringClustering(compositeClusteringTable,
pb.lessOrEqual(2, 15)))
+ .contains(GenericRow.of(1, BinaryString.fromString("a"), 5));
+ assertThat(readRowsWithStringClustering(compositeClusteringTable,
pb.greaterThan(2, 20)))
+ .contains(GenericRow.of(2, BinaryString.fromString("z"), 30));
+ }
+
// ==================== Spill Tests ====================
/** Test first-row mode with spill: keeps first values despite many
duplicate commits. */
@@ -1123,6 +1312,42 @@ class ClusteringTableTest {
return catalog.getTable(identifier);
}
+ private Table createTableCompositePk() throws Exception {
+ Identifier identifier = Identifier.create("default",
"composite_pk_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("k1", DataTypes.STRING())
+ .column("k2", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("k1", "k2")
+ .option(DELETION_VECTORS_ENABLED.key(), "true")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "v")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
+ private Table createFirstRowTableCompositePkWithLowSpillThreshold() throws
Exception {
+ Identifier identifier = Identifier.create("default",
"first_row_composite_pk_spill_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("k1", DataTypes.STRING())
+ .column("k2", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("k1", "k2")
+ .option(DELETION_VECTORS_ENABLED.key(), "true")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "v")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .option(MERGE_ENGINE.key(), "first-row")
+ .option(SORT_SPILL_THRESHOLD.key(), "2")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
private Table createFirstRowTableCompositePk() throws Exception {
Identifier identifier = Identifier.create("default",
"first_row_composite_pk_table");
Schema schema =
@@ -1160,6 +1385,23 @@ class ClusteringTableTest {
return catalog.getTable(identifier);
}
+ private Table createTableWithCompositeClusteringColumns() throws Exception
{
+ Identifier identifier = Identifier.create("default",
"composite_clustering_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("c", DataTypes.STRING())
+ .column("b", DataTypes.INT())
+ .primaryKey("a")
+ .option(DELETION_VECTORS_ENABLED.key(), "true")
+ .option(BUCKET.key(), "1")
+ .option(CLUSTERING_COLUMNS.key(), "c,b")
+ .option(PK_CLUSTERING_OVERRIDE.key(), "true")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ return catalog.getTable(identifier);
+ }
+
private List<GenericRow> readRowsCompositePk(Table targetTable) throws
Exception {
ReadBuilder readBuilder = targetTable.newReadBuilder();
@SuppressWarnings("resource")
@@ -1196,7 +1438,14 @@ class ClusteringTableTest {
}
private List<GenericRow> readRows(Table targetTable) throws Exception {
+ return readRows(targetTable, null);
+ }
+
+ private List<GenericRow> readRows(Table targetTable, Predicate filter)
throws Exception {
ReadBuilder readBuilder = targetTable.newReadBuilder();
+ if (filter != null) {
+ readBuilder.withFilter(filter);
+ }
@SuppressWarnings("resource")
CloseableIterator<InternalRow> iterator =
readBuilder
@@ -1211,6 +1460,30 @@ class ClusteringTableTest {
return result;
}
+ private List<GenericRow> readRowsWithStringClustering(Table targetTable)
throws Exception {
+ return readRowsWithStringClustering(targetTable, null);
+ }
+
+ private List<GenericRow> readRowsWithStringClustering(Table targetTable,
Predicate filter)
+ throws Exception {
+ ReadBuilder readBuilder = targetTable.newReadBuilder();
+ if (filter != null) {
+ readBuilder.withFilter(filter);
+ }
+ @SuppressWarnings("resource")
+ CloseableIterator<InternalRow> iterator =
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .toCloseableIterator();
+ List<GenericRow> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ result.add(GenericRow.of(row.getInt(0), row.getString(1),
row.getInt(2)));
+ }
+ return result;
+ }
+
private List<DataFileMeta> dataFiles(Table targetTable) {
List<DataFileMeta> result = new ArrayList<>();
for (Split split :
targetTable.newReadBuilder().newScan().plan().splits()) {