This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6508aba459 [core] Fix chain table anchor lookup for multi partition
keys (#8154)
6508aba459 is described below
commit 6508aba45912c637586313aed678b1b2503253c9
Author: Juntao Zhang <[email protected]>
AuthorDate: Mon Jun 8 08:58:46 2026 +0800
[core] Fix chain table anchor lookup for multi partition keys (#8154)
---
.../apache/paimon/table/ChainGroupReadTable.java | 14 ++----
.../org/apache/paimon/utils/ChainTableUtils.java | 16 ++++--
.../apache/paimon/utils/ChainTableUtilsTest.java | 46 +++++++++++++++++
.../apache/paimon/spark/SparkChainTableITCase.java | 58 ++++++++++++++++++++++
4 files changed, 119 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 915f46cd8e..75ba5f4097 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -285,8 +285,8 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
deltaPartitionsInGroup.sort(
(a, b) ->
chainPartitionComparator.compare(
-
partitionProjector.chainPartitionForCompare(a),
-
partitionProjector.chainPartitionForCompare(b)));
+
partitionProjector.extractChainPartition(a),
+
partitionProjector.extractChainPartition(b)));
// Build a targeted snapshot-anchor predicate:
// group fields exact-match AND chain < maxChainInGroup
@@ -308,15 +308,7 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
// List snapshot partitions for this group, sorted by
chain dimension.
List<BinaryRow> snapshotPartitionsInGroup =
newChainPartitionListingScan(true,
snapshotAnchorPredicate)
- .listPartitions().stream()
- .sorted(
- (a, b) ->
-
chainPartitionComparator.compare(
- partitionProjector
-
.chainPartitionForCompare(a),
- partitionProjector
-
.chainPartitionForCompare(b)))
- .collect(Collectors.toList());
+ .listPartitions();
// Find delta → snapshot mapping (for each delta
partition, find the nearest
// earlier
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index 99e25cc2ce..e6d4e4aadf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -250,22 +250,30 @@ public class ChainTableUtils {
RecordComparator chainComparator,
ChainPartitionProjector projector) {
+ sortedTargetPartitions.sort(
+ (a, b) ->
+ chainComparator.compare(
+ projector.extractChainPartition(b),
+ projector.extractChainPartition(a)));
+
Map<BinaryRow, BinaryRow> partitionMapping = new HashMap<>();
- int targetIndex = 0;
for (BinaryRow sourceRow : sortedSourcePartitions) {
+ int targetIndex = 0;
BinaryRow sourceChain = projector.extractChainPartition(sourceRow);
while (targetIndex < sortedTargetPartitions.size()) {
BinaryRow targetChain =
projector.extractChainPartition(sortedTargetPartitions.get(targetIndex));
if (chainComparator.compare(targetChain, sourceChain) < 0) {
- targetIndex++;
- } else {
break;
+ } else {
+ targetIndex++;
}
}
BinaryRow firstSmaller =
- (targetIndex > 0) ? sortedTargetPartitions.get(targetIndex
- 1) : null;
+ (targetIndex < sortedTargetPartitions.size())
+ ? sortedTargetPartitions.get(targetIndex)
+ : null;
partitionMapping.put(sourceRow, firstSmaller);
}
return partitionMapping;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
index 051323e180..971407b928 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
@@ -721,4 +721,50 @@ public class ChainTableUtilsTest {
// (US, Beijing, 20250731) → country mismatch → no match
assertThat(predicate.test(row(Lists.newArrayList("US", "Beijing",
"20250731")))).isFalse();
}
+
+ @Test
+ public void testFindFirstLatestPartitionsWithMultiChainKeys() {
+ // partition keys: (dt, hour), chain keys: (dt, hour)
+ RowType fullType =
+ RowType.builder()
+ .field("dt", DataTypes.STRING().notNull())
+ .field("hour", DataTypes.STRING().notNull())
+ .build();
+
+ ChainPartitionProjector projector = new
ChainPartitionProjector(fullType, 2);
+
+ // Snapshot partitions with same dt but different hour
+ BinaryRow snap1 = row(Lists.newArrayList("20250809", "01"));
+ BinaryRow snap2 = row(Lists.newArrayList("20250809", "02"));
+ List<BinaryRow> snapshotPartitions = Arrays.asList(snap1, snap2);
+
+ // Delta partitions
+ BinaryRow delta1 = row(Lists.newArrayList("20250810", "03"));
+ BinaryRow delta2 = row(Lists.newArrayList("20250810", "05"));
+ List<BinaryRow> deltaPartitions = Arrays.asList(delta1, delta2);
+
+ RecordComparator chainComparator =
+ (a, b) -> {
+ int cmp = a.getString(0).compareTo(b.getString(0));
+ if (cmp != 0) {
+ return cmp;
+ }
+ return a.getString(1).compareTo(b.getString(1));
+ };
+ Map<BinaryRow, BinaryRow> mapping =
+ ChainTableUtils.findFirstLatestPartitionsWithProjector(
+ deltaPartitions, snapshotPartitions, chainComparator,
projector);
+
+ // delta (20250810,03) → snapshot (20250809,02) (nearest earlier)
+ BinaryRow matched1 = mapping.get(delta1);
+ assertThat(matched1).isNotNull();
+ assertThat(getString(matched1, 0)).isEqualTo("20250809");
+ assertThat(getString(matched1, 1)).isEqualTo("02");
+
+ // delta (20250810,05) → snapshot (20250809,02) (nearest earlier)
+ BinaryRow matched2 = mapping.get(delta2);
+ assertThat(matched2).isNotNull();
+ assertThat(getString(matched2, 0)).isEqualTo("20250809");
+ assertThat(getString(matched2, 1)).isEqualTo("02");
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index c2833c223e..af457ac63f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -2362,4 +2362,62 @@ public class SparkChainTableITCase {
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
spark.close();
}
+
+ @Test
+ public void testChainTableWithMultiChainKeys(@TempDir java.nio.file.Path
tempDir)
+ throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ spark.sql(
+ "CREATE TABLE `chain_test` (\n"
+ + " `t1` BIGINT COMMENT 't1',\n"
+ + " `t2` BIGINT COMMENT 't2',\n"
+ + " `t3` STRING COMMENT 't3'\n"
+ + ") PARTITIONED BY (`dt` STRING, `hr` STRING)\n"
+ + "TBLPROPERTIES (\n"
+ + " 'bucket-key' = 't1',\n"
+ + " 'primary-key' = 'dt,hr,t1',\n"
+ + " 'partition.timestamp-pattern' = '$dt
$hr:00:00',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd
HH:mm:ss',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '1',\n"
+ + " 'merge-engine' = 'deduplicate',\n"
+ + " 'sequence.field' = 't2',\n"
+ + " 'chain-table.chain-partition-keys' = 'dt,hr'\n"
+ + ")");
+
+ setupChainTableBranches(spark, "chain_test");
+
+ // Write snapshot branch
+ spark.sql(
+ "INSERT INTO TABLE `my_db1`.`chain_test$branch_snapshot`
PARTITION (dt = '20250809', hr='01') VALUES (3, 1, '3');");
+ spark.sql(
+ "INSERT INTO TABLE `my_db1`.`chain_test$branch_snapshot`
PARTITION (dt = '20250809', hr='02') VALUES (4, 1, '4');");
+
+ // Write delta branch
+ spark.sql(
+ "INSERT INTO TABLE `my_db1`.`chain_test$branch_delta`
PARTITION (dt = '20250810', hr='03') VALUES (5, 1, '5');");
+ spark.sql(
+ "INSERT INTO TABLE `my_db1`.`chain_test$branch_delta`
PARTITION (dt = '20250810', hr='05') VALUES (6, 1, '6');");
+
+ // Query dt='20250810' and hr='05'
+ // Expected: snapshot(20250809/02) + delta(20250810/05)
+ // Because chain key = (dt, hr), anchor is the nearest earlier
(20250809, 02)
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
WHERE dt='20250810' AND hr='05'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[4,1,4,20250810,05]", "[5,1,5,20250810,05]",
"[6,1,6,20250810,05]");
+
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+ spark.close();
+ }
}