This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d2709555c0 [spark] Fix required configuration check for Paimon 
SparkSession extensions (#7365)
d2709555c0 is described below

commit d2709555c0e459ad9e10945f96fd477617b2b09b
Author: LSM <[email protected]>
AuthorDate: Sun May 24 10:05:03 2026 +0800

    [spark] Fix required configuration check for Paimon SparkSession extensions 
(#7365)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java |  2 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  2 +-
 .../org/apache/paimon/spark/util/OptionUtils.scala | 34 ++++++++++++-------
 .../apache/paimon/spark/sql/PaimonOptionTest.scala | 39 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 451a5fae26..913d4f582a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -127,8 +127,8 @@ public class SparkCatalog extends SparkBaseCatalog
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
-        checkRequiredConfigurations();
         SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
+        checkRequiredConfigurations(sparkSession);
         this.catalogName = name;
         CatalogContext catalogContext =
                 CatalogContext.create(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index f2d0135580..b63de85117 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -316,7 +316,7 @@ case class PaimonSparkWriter(
         writeWithoutBucket(input)
 
       case HASH_FIXED =>
-        if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
+        if (paimonExtensionEnabled(sparkSession) && 
BucketFunction.supportsTable(table)) {
           // Topology: input -> shuffle by partition & bucket
           val bucketNumber = coreOptions.bucket()
           val bucketKeyCol = tableSchema
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 2e6014ff1a..7a6fa547c2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -24,6 +24,7 @@ import org.apache.paimon.spark.{SparkCatalogOptions, 
SparkConnectorOptions}
 import org.apache.paimon.table.Table
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.internal.StaticSQLConf
 
@@ -36,12 +37,14 @@ object OptionUtils extends SQLConfHelper with Logging {
 
   private val PAIMON_OPTION_PREFIX = "spark.paimon."
   private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."
+  private val PAIMON_SPARK_SESSION_EXTENSIONS =
+    "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"
 
-  def paimonExtensionEnabled: Boolean = {
-    conf
+  def paimonExtensionEnabled(sparkSession: SparkSession): Boolean = {
+    sparkSession.sessionState.conf
       .getConf(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
       .getOrElse(Seq.empty)
-      
.contains("org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+      .contains(PAIMON_SPARK_SESSION_EXTENSIONS)
   }
 
   def getOptionString(option: ConfigOption[_]): String = {
@@ -63,15 +66,22 @@ object OptionUtils extends SQLConfHelper with Logging {
     }
   }
 
-  def checkRequiredConfigurations(): Unit = {
-    if 
(getOptionString(SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED).toBoolean)
 {
-      if (!paimonExtensionEnabled) {
-        throw new RuntimeException(
-          """
-            |When using Paimon, it is necessary to configure 
`spark.sql.extensions` and ensure that it includes 
`org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
-            |You can disable this check by configuring 
`spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly 
discouraged to do so.
-            |""".stripMargin)
-      }
+  private def requiredSparkConfsCheckEnabled(sparkSession: SparkSession): 
Boolean = {
+    sparkSession.sessionState.conf
+      .getConfString(
+        
s"$PAIMON_OPTION_PREFIX${SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.key()}",
+        
SparkConnectorOptions.REQUIRED_SPARK_CONFS_CHECK_ENABLED.defaultValue().toString
+      )
+      .toBoolean
+  }
+
+  def checkRequiredConfigurations(sparkSession: SparkSession): Unit = {
+    if (requiredSparkConfsCheckEnabled(sparkSession) && 
!paimonExtensionEnabled(sparkSession)) {
+      throw new RuntimeException(
+        """
+          |When using Paimon, it is necessary to configure 
`spark.sql.extensions` and ensure that it includes 
`org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
+          |You can disable this check by configuring 
`spark.paimon.requiredSparkConfsCheck.enabled` to `false`, but it is strongly 
discouraged to do so.
+          |""".stripMargin)
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
index 14351103f4..f602be161c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala
@@ -19,10 +19,12 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTableFactory
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.paimon.Utils
 import org.junit.jupiter.api.Assertions
 
@@ -265,4 +267,41 @@ class PaimonConfigCheckTest extends SparkFunSuite {
       }
     }
   }
+
+  test("Paimon Option: required confs check with temporary SQLConf") {
+    val spark = SparkSession
+      .builder()
+      .master("local[2]")
+      .config("spark.sql.catalog.paimon", 
"org.apache.paimon.spark.SparkCatalog")
+      .config("spark.sql.catalog.paimon.warehouse", 
Utils.createTempDir.getCanonicalPath)
+      .config(
+        "spark.sql.extensions",
+        "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+      .config("spark.paimon.requiredSparkConfsCheck.enabled", "true")
+      .getOrCreate()
+    try {
+      SQLConf.withExistingConf(new SQLConf) {
+        OptionUtils.checkRequiredConfigurations(spark)
+      }
+    } finally {
+      spark.close()
+    }
+  }
+
+  test("Paimon Option: required confs switch with temporary SQLConf") {
+    val spark = SparkSession
+      .builder()
+      .master("local[2]")
+      .config("spark.sql.catalog.paimon", 
"org.apache.paimon.spark.SparkCatalog")
+      .config("spark.sql.catalog.paimon.warehouse", 
Utils.createTempDir.getCanonicalPath)
+      .config("spark.paimon.requiredSparkConfsCheck.enabled", "false")
+      .getOrCreate()
+    try {
+      SQLConf.withExistingConf(new SQLConf) {
+        OptionUtils.checkRequiredConfigurations(spark)
+      }
+    } finally {
+      spark.close()
+    }
+  }
 }

Reply via email to