This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4eafdcc48f [spark] Align spark.paimon.branch option with explicit
branch syntax for chain tables (#8016)
4eafdcc48f is described below
commit 4eafdcc48ff4346609383d428d06d0db674d1ba9
Author: Juntao Zhang <[email protected]>
AuthorDate: Tue Jun 2 23:18:12 2026 +0800
[spark] Align spark.paimon.branch option with explicit branch syntax for
chain tables (#8016)
---
.../java/org/apache/paimon/spark/SparkCatalog.java | 5 +-
.../org/apache/paimon/spark/util/OptionUtils.scala | 37 +++++++++++--
.../apache/paimon/spark/SparkChainTableITCase.java | 64 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b299248a81..165be98980 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -102,6 +102,7 @@ import static
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_META
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.util.OptionUtils.withBranchFromOptions;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue;
import static
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue;
@@ -785,7 +786,9 @@ public class SparkCatalog extends SparkBaseCatalog
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws
NoSuchTableException {
try {
- org.apache.paimon.catalog.Identifier tblIdent =
toIdentifier(ident, catalogName);
+ org.apache.paimon.catalog.Identifier tblIdent =
+ withBranchFromOptions(
+ catalogName, toIdentifier(ident, catalogName),
extraOptions);
org.apache.paimon.table.Table table =
copyWithSQLConf(
catalog.getTable(tblIdent), catalogName, tblIdent,
extraOptions);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 66b75bf122..b21b83de26 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.util
+import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.Identifier
import org.apache.paimon.options.ConfigOption
import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
@@ -169,15 +170,41 @@ object OptionUtils extends SQLConfHelper with Logging {
catalogName: String = null,
ident: Identifier = null,
extraOptions: JMap[String, String] = new JHashMap[String, String]()): T
= {
- val mergedOptions = if (catalogName != null && ident != null) {
- mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
- } else {
- mergeSQLConf(extraOptions)
- }
+ val mergedOptions = getMergedOptions(catalogName, ident, extraOptions)
if (mergedOptions.isEmpty) {
table
} else {
table.copy(mergedOptions).asInstanceOf[T]
}
}
+
+ private def getMergedOptions(
+ catalogName: String = null,
+ ident: Identifier = null,
+ extraOptions: JMap[String, String] = new JHashMap[String, String]()):
JMap[String, String] = {
+ if (catalogName != null && ident != null) {
+ mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
+ } else {
+ mergeSQLConf(extraOptions)
+ }
+ }
+
+ def withBranchFromOptions(
+ catalogName: String = null,
+ identifier: Identifier = null,
+ extraOptions: JMap[String, String] = new JHashMap[String, String]()
+ ): Identifier = {
+ if (identifier != null && !identifier.isSystemTable) {
+ val branch =
+ getMergedOptions(catalogName, identifier,
extraOptions).get(CoreOptions.BRANCH.key)
+ if (branch != null && identifier.getBranchName == null) {
+ logWarning(
+ s"Using deprecated 'spark.paimon.branch=$branch' to access table
'${identifier.getTableName}'. " +
+ s"Please migrate to '${identifier.getTableName}$$branch_$branch'
syntax, as 'spark.paimon.branch' " +
+ s"will be removed in a future version.")
+ return new Identifier(identifier.getDatabaseName,
identifier.getTableName, branch)
+ }
+ }
+ identifier
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index 1907c7fcf3..c2833c223e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -2298,4 +2298,68 @@ public class SparkChainTableITCase {
spark.close();
}
+
+ @Test
+ public void testChainTableWithBranchOption(@TempDir java.nio.file.Path
tempDir)
+ throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS `chain_test` (\n"
+ + " `t1` BIGINT,\n"
+ + " `t2` BIGINT,\n"
+ + " `t3` STRING\n"
+ + ") PARTITIONED BY (`dt` STRING)\n"
+ + "TBLPROPERTIES (\n"
+ + " 'bucket-key' = 't1',\n"
+ + " 'primary-key' = 'dt,t1',\n"
+ + " 'partition.timestamp-pattern' = '$dt',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '1',\n"
+ + " 'merge-engine' = 'deduplicate',\n"
+ + " 'sequence.field' = 't2'\n"
+ + ")");
+ setupChainTableBranches(spark, "chain_test");
+ // Write main branch
+ spark.sql(
+ "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt =
'20250810') VALUES (1, 3, '0')");
+ // Write delta branch
+ spark.sql("SET spark.paimon.branch = delta");
+ spark.sql(
+ "INSERT OVERWRITE TABLE `my_db1`.`chain_test` PARTITION (dt =
'20250810') VALUES (1, 2, '1')");
+ spark.sql(
+ "INSERT OVERWRITE TABLE `my_db1`.`chain_test$branch_delta`
PARTITION (dt = '20250811') VALUES (2, 2, '1')");
+ assertThat(spark.sql("SELECT * FROM
`my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
+ spark.sql("RESET spark.paimon.branch");
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,2,1,20250811]",
"[2,2,1,20250811]");
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test$branch_snapshot` WHERE dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .isEmpty();
+
+ spark.sql("SET spark.paimon.branch = snapshot");
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .isEmpty();
+ assertThat(spark.sql("SELECT * FROM
`my_db1`.`chain_test$snapshots`").count()).isEqualTo(2);
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+ spark.close();
+ }
}