This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6508aba459 [core] Fix chain table anchor lookup for multi partition 
keys (#8154)
6508aba459 is described below

commit 6508aba45912c637586313aed678b1b2503253c9
Author: Juntao Zhang <[email protected]>
AuthorDate: Mon Jun 8 08:58:46 2026 +0800

    [core] Fix chain table anchor lookup for multi partition keys (#8154)
---
 .../apache/paimon/table/ChainGroupReadTable.java   | 14 ++----
 .../org/apache/paimon/utils/ChainTableUtils.java   | 16 ++++--
 .../apache/paimon/utils/ChainTableUtilsTest.java   | 46 +++++++++++++++++
 .../apache/paimon/spark/SparkChainTableITCase.java | 58 ++++++++++++++++++++++
 4 files changed, 119 insertions(+), 15 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 915f46cd8e..75ba5f4097 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -285,8 +285,8 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
                     deltaPartitionsInGroup.sort(
                             (a, b) ->
                                     chainPartitionComparator.compare(
-                                            
partitionProjector.chainPartitionForCompare(a),
-                                            
partitionProjector.chainPartitionForCompare(b)));
+                                            
partitionProjector.extractChainPartition(a),
+                                            
partitionProjector.extractChainPartition(b)));
 
                     // Build a targeted snapshot-anchor predicate:
                     //   group fields exact-match  AND  chain < maxChainInGroup
@@ -308,15 +308,7 @@ public class ChainGroupReadTable extends 
FallbackReadFileStoreTable {
                     // List snapshot partitions for this group, sorted by 
chain dimension.
                     List<BinaryRow> snapshotPartitionsInGroup =
                             newChainPartitionListingScan(true, 
snapshotAnchorPredicate)
-                                    .listPartitions().stream()
-                                    .sorted(
-                                            (a, b) ->
-                                                    
chainPartitionComparator.compare(
-                                                            partitionProjector
-                                                                    
.chainPartitionForCompare(a),
-                                                            partitionProjector
-                                                                    
.chainPartitionForCompare(b)))
-                                    .collect(Collectors.toList());
+                                    .listPartitions();
 
                     // Find delta → snapshot mapping (for each delta 
partition, find the nearest
                     // earlier
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index 99e25cc2ce..e6d4e4aadf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -250,22 +250,30 @@ public class ChainTableUtils {
             RecordComparator chainComparator,
             ChainPartitionProjector projector) {
 
+        sortedTargetPartitions.sort(
+                (a, b) ->
+                        chainComparator.compare(
+                                projector.extractChainPartition(b),
+                                projector.extractChainPartition(a)));
+
         Map<BinaryRow, BinaryRow> partitionMapping = new HashMap<>();
-        int targetIndex = 0;
 
         for (BinaryRow sourceRow : sortedSourcePartitions) {
+            int targetIndex = 0;
             BinaryRow sourceChain = projector.extractChainPartition(sourceRow);
             while (targetIndex < sortedTargetPartitions.size()) {
                 BinaryRow targetChain =
                         
projector.extractChainPartition(sortedTargetPartitions.get(targetIndex));
                 if (chainComparator.compare(targetChain, sourceChain) < 0) {
-                    targetIndex++;
-                } else {
                     break;
+                } else {
+                    targetIndex++;
                 }
             }
             BinaryRow firstSmaller =
-                    (targetIndex > 0) ? sortedTargetPartitions.get(targetIndex 
- 1) : null;
+                    (targetIndex < sortedTargetPartitions.size())
+                            ? sortedTargetPartitions.get(targetIndex)
+                            : null;
             partitionMapping.put(sourceRow, firstSmaller);
         }
         return partitionMapping;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
index 051323e180..971407b928 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
@@ -721,4 +721,50 @@ public class ChainTableUtilsTest {
         // (US, Beijing, 20250731) → country mismatch → no match
         assertThat(predicate.test(row(Lists.newArrayList("US", "Beijing", 
"20250731")))).isFalse();
     }
+
+    @Test
+    public void testFindFirstLatestPartitionsWithMultiChainKeys() {
+        // partition keys: (dt, hour), chain keys: (dt, hour)
+        RowType fullType =
+                RowType.builder()
+                        .field("dt", DataTypes.STRING().notNull())
+                        .field("hour", DataTypes.STRING().notNull())
+                        .build();
+
+        ChainPartitionProjector projector = new 
ChainPartitionProjector(fullType, 2);
+
+        // Snapshot partitions with same dt but different hour
+        BinaryRow snap1 = row(Lists.newArrayList("20250809", "01"));
+        BinaryRow snap2 = row(Lists.newArrayList("20250809", "02"));
+        List<BinaryRow> snapshotPartitions = Arrays.asList(snap1, snap2);
+
+        // Delta partitions
+        BinaryRow delta1 = row(Lists.newArrayList("20250810", "03"));
+        BinaryRow delta2 = row(Lists.newArrayList("20250810", "05"));
+        List<BinaryRow> deltaPartitions = Arrays.asList(delta1, delta2);
+
+        RecordComparator chainComparator =
+                (a, b) -> {
+                    int cmp = a.getString(0).compareTo(b.getString(0));
+                    if (cmp != 0) {
+                        return cmp;
+                    }
+                    return a.getString(1).compareTo(b.getString(1));
+                };
+        Map<BinaryRow, BinaryRow> mapping =
+                ChainTableUtils.findFirstLatestPartitionsWithProjector(
+                        deltaPartitions, snapshotPartitions, chainComparator, 
projector);
+
+        // delta (20250810,03) → snapshot (20250809,02) (nearest earlier)
+        BinaryRow matched1 = mapping.get(delta1);
+        assertThat(matched1).isNotNull();
+        assertThat(getString(matched1, 0)).isEqualTo("20250809");
+        assertThat(getString(matched1, 1)).isEqualTo("02");
+
+        // delta (20250810,05) → snapshot (20250809,02) (nearest earlier)
+        BinaryRow matched2 = mapping.get(delta2);
+        assertThat(matched2).isNotNull();
+        assertThat(getString(matched2, 0)).isEqualTo("20250809");
+        assertThat(getString(matched2, 1)).isEqualTo("02");
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index c2833c223e..af457ac63f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -2362,4 +2362,62 @@ public class SparkChainTableITCase {
         spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
         spark.close();
     }
+
+    @Test
+    public void testChainTableWithMultiChainKeys(@TempDir java.nio.file.Path 
tempDir)
+            throws IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        spark.sql(
+                "CREATE TABLE `chain_test` (\n"
+                        + "  `t1` BIGINT COMMENT 't1',\n"
+                        + "  `t2` BIGINT COMMENT 't2',\n"
+                        + "  `t3` STRING COMMENT 't3'\n"
+                        + ") PARTITIONED BY (`dt` STRING, `hr` STRING)\n"
+                        + "TBLPROPERTIES (\n"
+                        + "  'bucket-key' = 't1',\n"
+                        + "  'primary-key' = 'dt,hr,t1',\n"
+                        + "  'partition.timestamp-pattern' = '$dt 
$hr:00:00',\n"
+                        + "  'partition.timestamp-formatter' = 'yyyyMMdd 
HH:mm:ss',\n"
+                        + "  'chain-table.enabled' = 'true',\n"
+                        + "  'bucket' = '1',\n"
+                        + "  'merge-engine' = 'deduplicate',\n"
+                        + "  'sequence.field' = 't2',\n"
+                        + "  'chain-table.chain-partition-keys' = 'dt,hr'\n"
+                        + ")");
+
+        setupChainTableBranches(spark, "chain_test");
+
+        // Write snapshot branch
+        spark.sql(
+                "INSERT INTO TABLE `my_db1`.`chain_test$branch_snapshot` 
PARTITION (dt = '20250809', hr='01') VALUES (3, 1, '3');");
+        spark.sql(
+                "INSERT INTO TABLE `my_db1`.`chain_test$branch_snapshot` 
PARTITION (dt = '20250809', hr='02') VALUES (4, 1, '4');");
+
+        // Write delta branch
+        spark.sql(
+                "INSERT INTO TABLE `my_db1`.`chain_test$branch_delta` 
PARTITION (dt = '20250810', hr='03') VALUES (5, 1, '5');");
+        spark.sql(
+                "INSERT INTO TABLE `my_db1`.`chain_test$branch_delta` 
PARTITION (dt = '20250810', hr='05') VALUES (6, 1, '6');");
+
+        // Query dt='20250810' and hr='05'
+        // Expected: snapshot(20250809/02) + delta(20250810/05)
+        // Because chain key = (dt, hr), anchor is the nearest earlier 
(20250809, 02)
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
WHERE dt='20250810' AND hr='05'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[4,1,4,20250810,05]", "[5,1,5,20250810,05]", 
"[6,1,6,20250810,05]");
+
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+        spark.close();
+    }
 }

Reply via email to