This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c5f5bd7e6 [core] Fix ChainSplit NPE after branch table cache
invalidation (#7300)
7c5f5bd7e6 is described below
commit 7c5f5bd7e69c9dd240205516235afaa86700a34b
Author: Juntao Zhang <[email protected]>
AuthorDate: Wed Feb 25 20:09:15 2026 +0800
[core] Fix ChainSplit NPE after branch table cache invalidation (#7300)
---
.../org/apache/paimon/catalog/CachingCatalog.java | 7 +++++
.../apache/paimon/spark/SparkChainTableITCase.java | 35 ++++++++++++++++++++++
2 files changed, 42 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 9dfb32e888..aad5d16a11 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -324,6 +324,13 @@ public class CachingCatalog extends DelegateCatalog {
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
+ // clear all branches of this table
+ for (Identifier i : tableCache.asMap().keySet()) {
+ if (identifier.getTableName().equals(i.getTableName())
+ &&
identifier.getDatabaseName().equals(i.getDatabaseName())) {
+ tableCache.invalidate(i);
+ }
+ }
}
// ================================== Cache Public API
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index 5d14f0a3cb..2465a925d4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -754,4 +754,39 @@ public class SparkChainTableITCase {
spark.close();
}
+
+ @Test
+ public void testChainTableCacheInvalidation(@TempDir java.nio.file.Path
tempDir)
+ throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+ spark.sql(
+ "CREATE TABLE chain_test_t ("
+ + " `t1` string ,"
+ + " `t2` string ,"
+ + " `t3` string"
+ + ") PARTITIONED BY (`date` string)"
+ + "TBLPROPERTIES ("
+ + " 'chain-table.enabled' = 'true'"
+ + " ,'primary-key' = 'date,t1'"
+ + " ,'sequence.field' = 't2'"
+ + " ,'bucket-key' = 't1'"
+ + " ,'bucket' = '1'"
+ + " ,'partition.timestamp-pattern' = '$date'"
+ + " ,'partition.timestamp-formatter' = 'yyyyMMdd'"
+ + ")");
+ setupChainTableBranches(spark, "chain_test_t");
+ spark.sql(
+ "insert overwrite `chain_test_t$branch_delta` partition (date
= '20260224') values ('1', '1', '1');");
+ assertThat(
+ spark.sql("SELECT * FROM
`chain_test_t`").collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1,20260224]");
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test_t`;");
+ spark.close();
+ }
}