This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eafdcc48f [spark] Align spark.paimon.branch option with explicit 
branch syntax for chain tables (#8016)
4eafdcc48f is described below

commit 4eafdcc48ff4346609383d428d06d0db674d1ba9
Author: Juntao Zhang <[email protected]>
AuthorDate: Tue Jun 2 23:18:12 2026 +0800

    [spark] Align spark.paimon.branch option with explicit branch syntax for 
chain tables (#8016)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java |  5 +-
 .../org/apache/paimon/spark/util/OptionUtils.scala | 37 +++++++++++--
 .../apache/paimon/spark/SparkChainTableITCase.java | 64 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b299248a81..165be98980 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -102,6 +102,7 @@ import static 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_META
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static 
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.util.OptionUtils.withBranchFromOptions;
 import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
 import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue;
 import static 
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue;
@@ -785,7 +786,9 @@ public class SparkCatalog extends SparkBaseCatalog
     protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
             Identifier ident, Map<String, String> extraOptions) throws 
NoSuchTableException {
         try {
-            org.apache.paimon.catalog.Identifier tblIdent = 
toIdentifier(ident, catalogName);
+            org.apache.paimon.catalog.Identifier tblIdent =
+                    withBranchFromOptions(
+                            catalogName, toIdentifier(ident, catalogName), 
extraOptions);
             org.apache.paimon.table.Table table =
                     copyWithSQLConf(
                             catalog.getTable(tblIdent), catalogName, tblIdent, 
extraOptions);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 66b75bf122..b21b83de26 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.util
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.catalog.Identifier
 import org.apache.paimon.options.ConfigOption
 import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
@@ -169,15 +170,41 @@ object OptionUtils extends SQLConfHelper with Logging {
       catalogName: String = null,
       ident: Identifier = null,
       extraOptions: JMap[String, String] = new JHashMap[String, String]()): T 
= {
-    val mergedOptions = if (catalogName != null && ident != null) {
-      mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
-    } else {
-      mergeSQLConf(extraOptions)
-    }
+    val mergedOptions = getMergedOptions(catalogName, ident, extraOptions)
     if (mergedOptions.isEmpty) {
       table
     } else {
       table.copy(mergedOptions).asInstanceOf[T]
     }
   }
+
+  private def getMergedOptions(
+      catalogName: String = null,
+      ident: Identifier = null,
+      extraOptions: JMap[String, String] = new JHashMap[String, String]()): 
JMap[String, String] = {
+    if (catalogName != null && ident != null) {
+      mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
+    } else {
+      mergeSQLConf(extraOptions)
+    }
+  }
+
+  def withBranchFromOptions(
+      catalogName: String = null,
+      identifier: Identifier = null,
+      extraOptions: JMap[String, String] = new JHashMap[String, String]()
+  ): Identifier = {
+    if (identifier != null && !identifier.isSystemTable) {
+      val branch =
+        getMergedOptions(catalogName, identifier, 
extraOptions).get(CoreOptions.BRANCH.key)
+      if (branch != null && identifier.getBranchName == null) {
+        logWarning(
+          s"Using deprecated 'spark.paimon.branch=$branch' to access table 
'${identifier.getTableName}'. " +
+            s"Please migrate to '${identifier.getTableName}$$branch_$branch' 
syntax, as 'spark.paimon.branch' " +
+            s"will be removed in a future version.")
+        return new Identifier(identifier.getDatabaseName, 
identifier.getTableName, branch)
+      }
+    }
+    identifier
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index 1907c7fcf3..c2833c223e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -2298,4 +2298,68 @@ public class SparkChainTableITCase {
 
         spark.close();
     }
+
+    @Test
+    public void testChainTableWithBranchOption(@TempDir java.nio.file.Path 
tempDir)
+            throws IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS `chain_test` (\n"
+                        + "  `t1` BIGINT,\n"
+                        + "  `t2` BIGINT,\n"
+                        + "  `t3` STRING\n"
+                        + ") PARTITIONED BY (`dt` STRING)\n"
+                        + "TBLPROPERTIES (\n"
+                        + "  'bucket-key' = 't1',\n"
+                        + "  'primary-key' = 'dt,t1',\n"
+                        + "  'partition.timestamp-pattern' = '$dt',\n"
+                        + "  'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+                        + "  'chain-table.enabled' = 'true',\n"
+                        + "  'bucket' = '1',\n"
+                        + "  'merge-engine' = 'deduplicate',\n"
+                        + "  'sequence.field' = 't2'\n"
+                        + ")");
+        setupChainTableBranches(spark, "chain_test");
+        // Write main branch
+        spark.sql(
+                "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = 
'20250810') VALUES (1, 3, '0')");
+        // Write delta branch
+        spark.sql("SET spark.paimon.branch = delta");
+        spark.sql(
+                "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt = 
'20250810') VALUES (1, 2, '1')");
+        spark.sql(
+                "INSERT OVERWRITE TABLE `my_db1`.`chain_test$branch_delta` 
PARTITION (dt = '20250811') VALUES (2, 2, '1')");
+        assertThat(spark.sql("SELECT * FROM 
`my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
+        spark.sql("RESET spark.paimon.branch");
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,2,1,20250811]", 
"[2,2,1,20250811]");
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_snapshot` WHERE dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .isEmpty();
+
+        spark.sql("SET spark.paimon.branch = snapshot");
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .isEmpty();
+        assertThat(spark.sql("SELECT * FROM 
`my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+        spark.close();
+    }
 }

Reply via email to