This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d2709555c0 [spark] Fix required configuration check for Paimon
SparkSession extensions (#7365)
d2709555c0 is described below
commit d2709555c0e459ad9e10945f96fd477617b2b09b
Author: LSM <[email protected]>
AuthorDate: Sun May 24 10:05:03 2026 +0800
[spark] Fix required configuration check for Paimon SparkSession extensions
(#7365)
---
.../java/org/apache/paimon/spark/SparkCatalog.java | 2 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 2 +-
.../org/apache/paimon/spark/util/OptionUtils.scala | 34 ++++++++++++-------
.../apache/paimon/spark/sql/PaimonOptionTest.scala | 39 ++++++++++++++++++++++
4 files changed, 63 insertions(+), 14 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 451a5fae26..913d4f582a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -127,8 +127,8 @@ public class SparkCatalog extends SparkBaseCatalog
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
- checkRequiredConfigurations();
SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
+ checkRequiredConfigurations(sparkSession);
this.catalogName = name;
CatalogContext catalogContext =
CatalogContext.create(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index f2d0135580..b63de85117 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -316,7 +316,7 @@ case class PaimonSparkWriter(
writeWithoutBucket(input)
case HASH_FIXED =>
- if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
+ if (paimonExtensionEnabled(sparkSession) &&
BucketFunction.supportsTable(table)) {
// Topology: input -> shuffle by partition & bucket
val bucketNumber = coreOptions.bucket()
val bucketKeyCol = tableSchema
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 2e6014ff1a..7a6fa547c2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -24,6 +24,7 @@ import org.apache.paimon.spark.{SparkCatalogOptions,
SparkConnectorOptions}
import org.apache.paimon.table.Table
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.internal.StaticSQLConf
@@ -36,12 +37,14 @@ object OptionUtils extends SQLConfHelper with Logging {
private val PAIMON_OPTION_PREFIX = "spark.paimon."
private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."
+ private val PAIMON_SPARK_SESSION_EXTENSIONS =
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
- def paimonExtensionEnabled: Boolean = {
- conf
+ def paimonExtensionEnabled(sparkSession: SparkSession): Boolean = {
+ sparkSession.sessionState.conf
.getConf(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
.getOrElse(Seq.empty)
-
.contains("org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .contains(PAIMON_SPARK_SESSION_EXTENSIONS)
}
def getOptionString(option: ConfigOption[_]): String = {
@@ -63,15 +66,22 @@ object OptionUtils extends SQLConfHelper with Logging {
}
}
- def checkRequiredConfigurations(): Unit = {
- if
(getOptionString(SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED).toBoolean)
{
- if (!paimonExtensionEnabled) {
- throw new RuntimeException(
- """
- |When using Paimon, it is necessary to configure
`spark.sql.extensions` and ensure that it includes
`org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
- |You can disable this check by configuring
`spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly
discouraged to do so.
- |""".stripMargin)
- }
+ private def requiredSparkConfsCheckEnabled(sparkSession: SparkSession):
Boolean = {
+ sparkSession.sessionState.conf
+ .getConfString(
+
s"$PAIMON_OPTION_PREFIX${SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.key()}",
+
SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.defaultValue().toString
+ )
+ .toBoolean
+ }
+
+ def checkRequiredConfigurations(sparkSession: SparkSession): Unit = {
+ if (requiredSparkConfsCheckEnabled(sparkSession) &&
!paimonExtensionEnabled(sparkSession)) {
+ throw new RuntimeException(
+ """
+ |When using Paimon, it is necessary to configure
`spark.sql.extensions` and ensure that it includes
`org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
+ |You can disable this check by configuring
`spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly
discouraged to do so.
+ |""".stripMargin)
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index 14351103f4..f602be161c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -19,10 +19,12 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.FileStoreTableFactory
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.paimon.Utils
import org.junit.jupiter.api.Assertions
@@ -265,4 +267,41 @@ class PaimonConfigCheckTest extends SparkFunSuite {
}
}
}
+
+ test("Paimon Option: required confs check with temporary SQLConf") {
+ val spark = SparkSession
+ .builder()
+ .master("local[2]")
+ .config("spark.sql.catalog.paimon",
"org.apache.paimon.spark.SparkCatalog")
+ .config("spark.sql.catalog.paimon.warehouse",
Utils.createTempDir.getCanonicalPath)
+ .config(
+ "spark.sql.extensions",
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .config("spark.paimon.requiredSparkConfsCheck.enabled", "true")
+ .getOrCreate()
+ try {
+ SQLConf.withExistingConf(new SQLConf) {
+ OptionUtils.checkRequiredConfigurations(spark)
+ }
+ } finally {
+ spark.close()
+ }
+ }
+
+ test("Paimon Option: required confs switch with temporary SQLConf") {
+ val spark = SparkSession
+ .builder()
+ .master("local[2]")
+ .config("spark.sql.catalog.paimon",
"org.apache.paimon.spark.SparkCatalog")
+ .config("spark.sql.catalog.paimon.warehouse",
Utils.createTempDir.getCanonicalPath)
+ .config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
+ .getOrCreate()
+ try {
+ SQLConf.withExistingConf(new SQLConf) {
+ OptionUtils.checkRequiredConfigurations(spark)
+ }
+ } finally {
+ spark.close()
+ }
+ }
}