This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fc176f4f34d [SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig 
interface
6fc176f4f34d is described below

commit 6fc176f4f34d73d6f6975836951562243343ba9a
Author: Herman van Hovell <[email protected]>
AuthorDate: Tue Sep 17 17:09:09 2024 -0400

    [SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig interface
    
    ### What changes were proposed in this pull request?
    This PR introduces a shared RuntimeConfig interface.
    
    ### Why are the changes needed?
    We are creating a shared Scala Spark SQL interface for Classic and Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47980 from hvanhovell/SPARK-49413.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  14 +--
 .../ConnectRuntimeConfig.scala}                    |  70 +++-----------
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |   4 +-
 project/MimaExcludes.scala                         |   4 +
 .../scala/org/apache/spark/sql/RuntimeConfig.scala | 105 +++++++++++++++++++++
 .../org/apache/spark/sql/api/SparkSession.scala    |  13 ++-
 .../execution/ExecuteGrpcResponseSender.scala      |   8 +-
 .../spark/sql/connect/service/SessionHolder.scala  |   4 +-
 .../spark/sql/connect/utils/ErrorUtils.scala       |   9 +-
 .../service/SparkConnectSessionHolderSuite.scala   |   2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  |   4 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  |  19 ++--
 .../spark/sql/artifact/ArtifactManager.scala       |   2 +-
 .../apache/spark/sql/execution/CacheManager.scala  |   2 +-
 .../apache/spark/sql/execution/SQLExecution.scala  |   2 +-
 .../apache/spark/sql/execution/command/ddl.scala   |   2 +-
 .../apache/spark/sql/execution/command/views.scala |   2 +-
 .../sql/execution/datasources/DataSource.scala     |   3 +-
 .../datasources/binaryfile/BinaryFileFormat.scala  |   2 +-
 .../spark/sql/execution/datasources/ddl.scala      |   2 +-
 .../datasources/v2/state/StateDataSource.scala     |   8 +-
 .../sql/execution/streaming/AsyncLogPurge.scala    |   3 +-
 .../execution/streaming/MicroBatchExecution.scala  |   2 +-
 .../spark/sql/execution/streaming/OffsetSeq.scala  |  23 +++--
 .../sql/execution/streaming/StreamExecution.scala  |   2 +-
 .../sql/execution/streaming/WatermarkTracker.scala |   3 +-
 .../RuntimeConfigImpl.scala}                       | 101 +++-----------------
 .../sql/streaming/StreamingQueryManager.scala      |   2 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |   2 +-
 .../scala/org/apache/spark/sql/GenTPCDSData.scala  |   2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala     |   4 +-
 .../org/apache/spark/sql/RuntimeConfigSuite.scala  |   3 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |   7 +-
 .../spark/sql/SparkSessionBuilderSuite.scala       |  20 ++--
 .../spark/sql/SparkSessionExtensionSuite.scala     |   6 +-
 .../spark/sql/StatisticsCollectionTestBase.scala   |   4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |   8 +-
 .../execution/CoalesceShufflePartitionsSuite.scala |   4 +-
 .../columnar/InMemoryColumnarQuerySuite.scala      |   2 +-
 .../columnar/PartitionBatchPruningSuite.scala      |   4 +-
 .../execution/datasources/ReadSchemaSuite.scala    |  10 +-
 .../binaryfile/BinaryFileFormatSuite.scala         |   2 +-
 .../sql/execution/datasources/orc/OrcTest.scala    |   2 +-
 .../datasources/parquet/ParquetIOSuite.scala       |   4 +-
 .../state/StateDataSourceChangeDataReadSuite.scala |   2 +-
 .../v2/state/StateDataSourceReadSuite.scala        |   4 +-
 .../state/RocksDBStateStoreIntegrationSuite.scala  |   2 +-
 .../execution/streaming/state/RocksDBSuite.scala   |  11 ++-
 .../sql/expressions/ExpressionInfoSuite.scala      |   2 +-
 .../apache/spark/sql/internal/SQLConfSuite.scala   |  88 ++++++++---------
 .../spark/sql/sources/BucketedReadSuite.scala      |   2 +-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  |   2 +-
 .../sql/streaming/FileStreamSourceSuite.scala      |   4 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala    |   2 +-
 .../apache/spark/sql/streaming/StreamTest.scala    |   5 +-
 .../sql/streaming/TriggerAvailableNowSuite.scala   |   2 +-
 .../apache/spark/sql/test/SharedSparkSession.scala |   2 +
 .../spark/sql/hive/HiveSharedStateSuite.scala      |   2 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala      |   2 +-
 .../hive/execution/HiveSerDeReadWriteSuite.scala   |   2 +-
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |   4 +-
 61 files changed, 317 insertions(+), 322 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 209ec88618c4..989a7e0c174c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder, 
CloseableIterator, Spar
 import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
 import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
 import org.apache.spark.sql.functions.lit
-import org.apache.spark.sql.internal.{CatalogImpl, SessionCleaner, SqlApiConf}
+import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig, 
SessionCleaner, SqlApiConf}
 import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr, 
toTypedExpr}
 import org.apache.spark.sql.streaming.DataStreamReader
 import org.apache.spark.sql.streaming.StreamingQueryManager
@@ -88,16 +88,8 @@ class SparkSession private[sql] (
     client.hijackServerSideSessionIdForTesting(suffix)
   }
 
-  /**
-   * Runtime configuration interface for Spark.
-   *
-   * This is the interface through which the user can get and set all Spark 
configurations that
-   * are relevant to Spark SQL. When getting the value of a config, his 
defaults to the value set
-   * in server, if any.
-   *
-   * @since 3.4.0
-   */
-  val conf: RuntimeConfig = new RuntimeConfig(client)
+  /** @inheritdoc */
+  val conf: RuntimeConfig = new ConnectRuntimeConfig(client)
 
   /** @inheritdoc */
   @transient
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
similarity index 68%
rename from 
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
rename to 
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
index f77dd512ef25..7578e2424fb4 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
 
 import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.connect.client.SparkConnectClient
 
 /**
@@ -25,61 +26,31 @@ import 
org.apache.spark.sql.connect.client.SparkConnectClient
  *
  * @since 3.4.0
  */
-class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging {
+class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
+    extends RuntimeConfig
+    with Logging {
 
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def set(key: String, value: String): Unit = {
     executeConfigRequest { builder =>
       builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value)
     }
   }
 
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 3.4.0
-   */
-  def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value))
-
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 3.4.0
-   */
-  def set(key: String, value: Long): Unit = set(key, String.valueOf(value))
-
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @throws java.util.NoSuchElementException
-   *   if the key is not set and does not have a default value
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @throws[NoSuchElementException]("if the key is not set")
   def get(key: String): String = getOption(key).getOrElse {
     throw new NoSuchElementException(key)
   }
 
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def get(key: String, default: String): String = {
     executeConfigRequestSingleValue { builder =>
       
builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
     }
   }
 
-  /**
-   * Returns all properties set in this conf.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def getAll: Map[String, String] = {
     val response = executeConfigRequest { builder =>
       builder.getGetAllBuilder
@@ -92,11 +63,7 @@ class RuntimeConfig private[sql] (client: 
SparkConnectClient) extends Logging {
     builder.result()
   }
 
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def getOption(key: String): Option[String] = {
     val pair = executeConfigRequestSinglePair { builder =>
       builder.getGetOptionBuilder.addKeys(key)
@@ -108,27 +75,14 @@ class RuntimeConfig private[sql] (client: 
SparkConnectClient) extends Logging {
     }
   }
 
-  /**
-   * Resets the configuration property for the given key.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def unset(key: String): Unit = {
     executeConfigRequest { builder =>
       builder.getUnsetBuilder.addKeys(key)
     }
   }
 
-  /**
-   * Indicates whether the configuration property with the given key is 
modifiable in the current
-   * session.
-   *
-   * @return
-   *   `true` if the configuration property is modifiable. For static SQL, 
Spark Core, invalid
-   *   (not existing) and other non-modifiable configuration properties, the 
returned value is
-   *   `false`.
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def isModifiable(key: String): Boolean = {
     val modifiable = executeConfigRequestSingleValue { builder =>
       builder.getIsModifiableBuilder.addKeys(key)
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 9ae6a9290f80..1d119de43970 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1156,7 +1156,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
 
   test("allow group.id prefix") {
     // Group ID prefix is only supported by consumer based offset reader
-    if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
+    if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
       testGroupId("groupIdPrefix", (expected, actual) => {
         assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === 
expected),
           "Valid consumer groups don't contain the expected group id - " +
@@ -1167,7 +1167,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
 
   test("allow group.id override") {
     // Group ID override is only supported by consumer based offset reader
-    if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
+    if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
       testGroupId("kafka.group.id", (expected, actual) => {
         assert(actual.exists(_ === expected), "Valid consumer groups don't " +
           s"contain the expected group id - Valid consumer groups: $actual / " 
+
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6eee1e759e5e..68433b501bcc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -160,6 +160,10 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriterV2"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WriteConfigMethods"),
 
+    // SPARK-49413: Create a shared RuntimeConfig interface.
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"),
+
     // SPARK-49287: Shared Streaming interfaces
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.SparkListenerEvent"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"),
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
new file mode 100644
index 000000000000..23a2774ebc3a
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Stable
+
+/**
+ * Runtime configuration interface for Spark. To access this, use 
`SparkSession.conf`.
+ *
+ * Options set here are automatically propagated to the Hadoop configuration 
during I/O.
+ *
+ * @since 2.0.0
+ */
+@Stable
+abstract class RuntimeConfig {
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: String): Unit
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: Boolean): Unit = {
+    set(key, value.toString)
+  }
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 2.0.0
+   */
+  def set(key: String, value: Long): Unit = {
+    set(key, value.toString)
+  }
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given 
key.
+   *
+   * @throws java.util.NoSuchElementException
+   *   if the key is not set and does not have a default value
+   * @since 2.0.0
+   */
+  @throws[NoSuchElementException]("if the key is not set")
+  def get(key: String): String
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given 
key.
+   *
+   * @since 2.0.0
+   */
+  def get(key: String, default: String): String
+
+  /**
+   * Returns all properties set in this conf.
+   *
+   * @since 2.0.0
+   */
+  def getAll: Map[String, String]
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given 
key.
+   *
+   * @since 2.0.0
+   */
+  def getOption(key: String): Option[String]
+
+  /**
+   * Resets the configuration property for the given key.
+   *
+   * @since 2.0.0
+   */
+  def unset(key: String): Unit
+
+  /**
+   * Indicates whether the configuration property with the given key is 
modifiable in the current
+   * session.
+   *
+   * @return
+   *   `true` if the configuration property is modifiable. For static SQL, 
Spark Core, invalid
+   *   (not existing) and other non-modifiable configuration properties, the 
returned value is
+   *   `false`.
+   * @since 2.4.0
+   */
+  def isModifiable(key: String): Boolean
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index cf502c746d24..0580931620aa 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -26,7 +26,7 @@ import _root_.java.net.URI
 import _root_.java.util
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -58,6 +58,17 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends 
Serializable with C
    */
   def version: String
 
+  /**
+   * Runtime configuration interface for Spark.
+   *
+   * This is the interface through which the user can get and set all Spark 
and Hadoop
+   * configurations that are relevant to Spark SQL. When getting the value of 
a config, this
+   * defaults to the value set in the underlying `SparkContext`, if any.
+   *
+   * @since 2.0.0
+   */
+  def conf: RuntimeConfig
+
   /**
    * A collection of methods for registering user-defined functions (UDF).
    *
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 3e360372d560..051093fcad27 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -142,7 +142,9 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
    * client, but rather enqueued to in the response observer.
    */
   private def enqueueProgressMessage(force: Boolean = false): Unit = {
-    if 
(executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL) 
> 0) {
+    val progressReportInterval = 
executeHolder.sessionHolder.session.sessionState.conf
+      .getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
+    if (progressReportInterval > 0) {
       SparkConnectService.executionListener.foreach { listener =>
         // It is possible, that the tracker is no longer available and in this
         // case we simply ignore it and do not send any progress message. This 
avoids
@@ -240,8 +242,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           // monitor, and will notify upon state change.
           if (response.isEmpty) {
             // Wake up more frequently to send the progress updates.
-            val progressTimeout =
-              
executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
+            val progressTimeout = 
executeHolder.sessionHolder.session.sessionState.conf
+              .getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
             // If the progress feature is disabled, wait for the deadline.
             val timeout = if (progressTimeout > 0) {
               progressTimeout
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 0cb820b39e87..e56d66da3050 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -444,8 +444,8 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
    */
   private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
       transform: proto.Relation => LogicalPlan): LogicalPlan = {
-    val planCacheEnabled =
-      
Option(session).forall(_.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, 
true))
+    val planCacheEnabled = Option(session)
+      
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, 
true))
     // We only cache plans that have a plan ID.
     val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
 
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 355048cf3036..f1636ed1ef09 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -205,7 +205,9 @@ private[connect] object ErrorUtils extends Logging {
       case _ =>
     }
 
-    if 
(sessionHolderOpt.exists(_.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)))
 {
+    val enrichErrorEnabled = sessionHolderOpt.exists(
+      
_.session.sessionState.conf.getConf(Connect.CONNECT_ENRICH_ERROR_ENABLED))
+    if (enrichErrorEnabled) {
       // Generate a new unique key for this exception.
       val errorId = UUID.randomUUID().toString
 
@@ -216,9 +218,10 @@ private[connect] object ErrorUtils extends Logging {
     }
 
     lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
+    val stackTraceEnabled = sessionHolderOpt.exists(
+      
_.session.sessionState.conf.getConf(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED))
     val withStackTrace =
-      if (sessionHolderOpt.exists(
-          _.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && 
stackTrace.nonEmpty)) {
+      if (stackTraceEnabled && stackTrace.nonEmpty) {
         val maxSize = Math.min(
           SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
           maxMetadataSize)
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
index beebe5d2e2dc..ed2f60afb009 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
@@ -399,7 +399,7 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
   test("Test session plan cache - disabled") {
     val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
     // Disable plan cache of the session
-    sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, 
false)
+    
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, 
false)
     val planner = new SparkConnectPlanner(sessionHolder)
 
     val query = buildRelation("select 1")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0fab60a94842..6e5dcc24e29d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -243,7 +243,7 @@ class Dataset[T] private[sql](
 
   @transient private[sql] val logicalPlan: LogicalPlan = {
     val plan = queryExecution.commandExecuted
-    if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
+    if 
(sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED))
 {
       val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new 
HashSet[Long])
       dsIds.add(id)
       plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
@@ -772,7 +772,7 @@ class Dataset[T] private[sql](
   private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
     val newExpr = expr transform {
       case a: AttributeReference
-        if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
+        if 
sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)
 =>
         val metadata = new MetadataBuilder()
           .withMetadata(a.metadata)
           .putLong(Dataset.DATASET_ID_KEY, id)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a7fb71d95d14..5746b942341f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -173,16 +173,8 @@ class SparkSession private(
   @transient
   val sqlContext: SQLContext = new SQLContext(this)
 
-  /**
-   * Runtime configuration interface for Spark.
-   *
-   * This is the interface through which the user can get and set all Spark 
and Hadoop
-   * configurations that are relevant to Spark SQL. When getting the value of 
a config,
-   * this defaults to the value set in the underlying `SparkContext`, if any.
-   *
-   * @since 2.0.0
-   */
-  @transient lazy val conf: RuntimeConfig = new 
RuntimeConfig(sessionState.conf)
+  /** @inheritdoc */
+  @transient lazy val conf: RuntimeConfig = new 
RuntimeConfigImpl(sessionState.conf)
 
   /**
    * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
@@ -745,7 +737,8 @@ class SparkSession private(
   }
 
   private[sql] def leafNodeDefaultParallelism: Int = {
-    
conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse(sparkContext.defaultParallelism)
+    sessionState.conf.getConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM)
+      .getOrElse(sparkContext.defaultParallelism)
   }
 
   private[sql] object Converter extends ColumnNodeToExpressionConverter with 
Serializable {
@@ -1110,13 +1103,13 @@ object SparkSession extends Logging {
   private[sql] def getOrCloneSessionWithConfigsOff(
       session: SparkSession,
       configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
-    val configsEnabled = configurations.filter(session.conf.get[Boolean])
+    val configsEnabled = 
configurations.filter(session.sessionState.conf.getConf[Boolean])
     if (configsEnabled.isEmpty) {
       session
     } else {
       val newSession = session.cloneSession()
       configsEnabled.foreach(conf => {
-        newSession.conf.set(conf, false)
+        newSession.sessionState.conf.setConf(conf, false)
       })
       newSession
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 4eb7d4fa17ee..1ee960622fc2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -324,7 +324,7 @@ class ArtifactManager(session: SparkSession) extends 
Logging {
     val fs = destFSPath.getFileSystem(hadoopConf)
     if (fs.isInstanceOf[LocalFileSystem]) {
       val allowDestLocalConf =
-        
session.conf.get(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL)
+        
session.sessionState.conf.getConf(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL)
           .getOrElse(
             
session.conf.get("spark.connect.copyFromLocalToFs.allowDestLocal").contains("true"))
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index aae424afcb8a..1bf6f4e4d7d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -474,7 +474,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
     // Bucketed scan only has one time overhead but can have multi-times 
benefits in cache,
     // so we always do bucketed scan in a cached plan.
     var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
-    if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) 
{
+    if 
(!session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING))
 {
       // Allowing changing cached plan output partitioning might lead to 
regression as it introduces
       // extra shuffle
       disableConfigs =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 58fff2d4a1a2..12ff649b621e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -87,7 +87,7 @@ object SQLExecution extends Logging {
     executionIdToQueryExecution.put(executionId, queryExecution)
     val originalInterruptOnCancel = 
sc.getLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL)
     if (originalInterruptOnCancel == null) {
-      val interruptOnCancel = 
sparkSession.conf.get(SQLConf.INTERRUPT_ON_CANCEL)
+      val interruptOnCancel = 
sparkSession.sessionState.conf.getConf(SQLConf.INTERRUPT_ON_CANCEL)
       sc.setInterruptOnCancel(interruptOnCancel)
     }
     try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3f221bfa5305..814e56b204f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -861,7 +861,7 @@ case class RepairTableCommand(
     // Hive metastore may not have enough memory to handle millions of 
partitions in single RPC,
     // we should split them into smaller batches. Since Hive client is not 
thread safe, we cannot
     // do this in parallel.
-    val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
+    val batchSize = 
spark.sessionState.conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
     partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
       val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
       val parts = batch.map { case (spec, location) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index e1061a46db7b..071e3826b20a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -135,7 +135,7 @@ case class CreateViewCommand(
         referredTempFunctions)
       catalog.createTempView(name.table, tableDefinition, overrideIfExists = 
replace)
     } else if (viewType == GlobalTempView) {
-      val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+      val db = 
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
       val viewIdent = TableIdentifier(name.table, Option(db))
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       val tableDefinition = createTemporaryViewRelation(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d88b5ee8877d..968c204841e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -267,7 +267,8 @@ case class DataSource(
             checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, 
checkFilesExist = false)
           createInMemoryFileIndex(globbedPaths)
         })
-        val forceNullable = 
sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
+        val forceNullable = sparkSession.sessionState.conf
+          .getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
         val sourceDataSchema = if (forceNullable) dataSchema.asNullable else 
dataSchema
         SourceInfo(
           s"FileSource[$path]",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index cbff526592f9..54c100282e2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -98,7 +98,7 @@ class BinaryFileFormat extends FileFormat with 
DataSourceRegister {
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
     val filterFuncs = filters.flatMap(filter => createFilterFunction(filter))
-    val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
+    val maxLength = 
sparkSession.sessionState.conf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
       val path = file.toPath
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fc6cba786c4e..d9367d92d462 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -115,7 +115,7 @@ case class CreateTempViewUsing(
       }.logicalPlan
 
     if (global) {
-      val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+      val db = 
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
       val viewIdent = TableIdentifier(tableIdent.table, Option(db))
       val viewDefinition = createTemporaryViewRelation(
         viewIdent,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 83399e2cac01..50b90641d309 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.DataSourceOptions
 import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
 import org.apache.spark.sql.connector.expressions.Transform
@@ -119,9 +119,9 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
           throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId, 
checkpointLocation)
         )
 
-        val clonedRuntimeConf = new 
RuntimeConfig(session.sessionState.conf.clone())
-        OffsetSeqMetadata.setSessionConf(metadata, clonedRuntimeConf)
-        StateStoreConf(clonedRuntimeConf.sqlConf)
+        val clonedSqlConf = session.sessionState.conf.clone()
+        OffsetSeqMetadata.setSessionConf(metadata, clonedSqlConf)
+        StateStoreConf(clonedSqlConf)
 
       case _ =>
         throw StateDataSourceErrors.offsetLogUnavailable(batchId, 
checkpointLocation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
index 06fdc6c53bc4..cb7e71bda84d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
@@ -49,7 +49,8 @@ trait AsyncLogPurge extends Logging {
   // which are written per run.
   protected def purgeStatefulMetadata(plan: SparkPlan): Unit
 
-  protected lazy val useAsyncPurge: Boolean = 
sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE)
+  protected lazy val useAsyncPurge: Boolean = sparkSession.sessionState.conf
+    .getConf(SQLConf.ASYNC_LOG_PURGE)
 
   protected def purgeAsync(batchId: Long): Unit = {
     if (purgeRunning.compareAndSet(false, true)) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 285494543533..053aef6ced3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -477,7 +477,7 @@ class MicroBatchExecution(
 
         // update offset metadata
         nextOffsets.metadata.foreach { metadata =>
-          OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
+          OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.sessionState.conf)
           execCtx.offsetSeqMetadata = OffsetSeqMetadata(
             metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
           watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index f0be33ad9a9d..d5facc245e72 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -26,6 +26,7 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
SparkDataStream}
 import 
org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper,
 StreamingAggregationStateManager, SymmetricHashJoinStateManager}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 
 
@@ -135,20 +136,21 @@ object OffsetSeqMetadata extends Logging {
   }
 
   /** Set the SparkSession configuration with the values in the metadata */
-  def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig): 
Unit = {
+  def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: SQLConf): Unit 
= {
+    val configs = sessionConf.getAllConfs
     OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
 
       metadata.conf.get(confKey) match {
 
         case Some(valueInMetadata) =>
           // Config value exists in the metadata, update the session config 
with this value
-          val optionalValueInSession = sessionConf.getOption(confKey)
-          if (optionalValueInSession.isDefined && optionalValueInSession.get 
!= valueInMetadata) {
+          val optionalValueInSession = sessionConf.getConfString(confKey, null)
+          if (optionalValueInSession != null && optionalValueInSession != 
valueInMetadata) {
             logWarning(log"Updating the value of conf '${MDC(CONFIG, 
confKey)}' in current " +
-              log"session from '${MDC(OLD_VALUE, optionalValueInSession.get)}' 
" +
+              log"session from '${MDC(OLD_VALUE, optionalValueInSession)}' " +
               log"to '${MDC(NEW_VALUE, valueInMetadata)}'.")
           }
-          sessionConf.set(confKey, valueInMetadata)
+          sessionConf.setConfString(confKey, valueInMetadata)
 
         case None =>
           // For backward compatibility, if a config was not recorded in the 
offset log,
@@ -157,14 +159,17 @@ object OffsetSeqMetadata extends Logging {
           relevantSQLConfDefaultValues.get(confKey) match {
 
             case Some(defaultValue) =>
-              sessionConf.set(confKey, defaultValue)
+              sessionConf.setConfString(confKey, defaultValue)
               logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log, " +
                 log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'")
 
             case None =>
-              val valueStr = sessionConf.getOption(confKey).map { v =>
-                s" Using existing session conf value '$v'."
-              }.getOrElse { " No value set in session conf." }
+              val value = sessionConf.getConfString(confKey, null)
+              val valueStr = if (value != null) {
+                s" Using existing session conf value '$value'."
+              } else {
+                " No value set in session conf."
+              }
               logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in 
the offset log. " +
                 log"${MDC(TIP, valueStr)}")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4b1b9e02a242..8f030884ad33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -483,7 +483,7 @@ abstract class StreamExecution(
   @throws[TimeoutException]
   protected def interruptAndAwaitExecutionThreadTermination(): Unit = {
     val timeout = math.max(
-      sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0)
+      sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 
0)
     queryExecutionThread.interrupt()
     queryExecutionThread.join(timeout)
     if (queryExecutionThread.isAlive) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
index 54c47ec4e6ed..3e6f122f463d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -135,7 +135,8 @@ object WatermarkTracker {
     // saved in the checkpoint (e.g., old checkpoints), then the default `min` 
policy is enforced
     // through defaults specified in OffsetSeqMetadata.setSessionConf().
     val policyName = conf.get(
-      SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY, 
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME)
+      SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key,
+      MultipleWatermarkPolicy.DEFAULT_POLICY_NAME)
     new WatermarkTracker(MultipleWatermarkPolicy(policyName))
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
similarity index 51%
rename from sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
index ed8cf4f121f0..ca439cdb8995 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
 
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SPARK_DOC_ROOT
 import org.apache.spark.annotation.Stable
-import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Runtime configuration interface for Spark. To access this, use 
`SparkSession.conf`.
@@ -33,89 +33,26 @@ import org.apache.spark.sql.internal.SQLConf
  * @since 2.0.0
  */
 @Stable
-class RuntimeConfig private[sql](val sqlConf: SQLConf = new SQLConf) {
+class RuntimeConfigImpl private[sql](val sqlConf: SQLConf = new SQLConf) 
extends RuntimeConfig {
 
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def set(key: String, value: String): Unit = {
     requireNonStaticConf(key)
     sqlConf.setConfString(key, value)
   }
 
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 2.0.0
-   */
-  def set(key: String, value: Boolean): Unit = {
-    set(key, value.toString)
-  }
-
-  /**
-   * Sets the given Spark runtime configuration property.
-   *
-   * @since 2.0.0
-   */
-  def set(key: String, value: Long): Unit = {
-    set(key, value.toString)
-  }
-
-  /**
-   * Sets the given Spark runtime configuration property.
-   */
-  private[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = {
-    requireNonStaticConf(entry.key)
-    sqlConf.setConf(entry, value)
-  }
-
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @throws java.util.NoSuchElementException if the key is not set and does 
not have a default
-   *                                          value
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @throws[NoSuchElementException]("if the key is not set")
   def get(key: String): String = {
     sqlConf.getConfString(key)
   }
 
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def get(key: String, default: String): String = {
     sqlConf.getConfString(key, default)
   }
 
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   */
-  @throws[NoSuchElementException]("if the key is not set")
-  private[sql] def get[T](entry: ConfigEntry[T]): T = {
-    sqlConf.getConf(entry)
-  }
-
-  private[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
-    sqlConf.getConf(entry)
-  }
-
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   */
-  private[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
-    sqlConf.getConf(entry, default)
-  }
-
-  /**
-   * Returns all properties set in this conf.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def getAll: Map[String, String] = {
     sqlConf.getAllConfs
   }
@@ -124,36 +61,20 @@ class RuntimeConfig private[sql](val sqlConf: SQLConf = 
new SQLConf) {
     getAll.asJava
   }
 
-  /**
-   * Returns the value of Spark runtime configuration property for the given 
key.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def getOption(key: String): Option[String] = {
     try Option(get(key)) catch {
       case _: NoSuchElementException => None
     }
   }
 
-  /**
-   * Resets the configuration property for the given key.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def unset(key: String): Unit = {
     requireNonStaticConf(key)
     sqlConf.unsetConf(key)
   }
 
-  /**
-   * Indicates whether the configuration property with the given key
-   * is modifiable in the current session.
-   *
-   * @return `true` if the configuration property is modifiable. For static 
SQL, Spark Core,
-   *         invalid (not existing) and other non-modifiable configuration 
properties,
-   *         the returned value is `false`.
-   * @since 2.4.0
-   */
+  /** @inheritdoc */
   def isModifiable(key: String): Boolean = sqlConf.isModifiable(key)
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 55d2e639a56b..3ab6d02f6b51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -364,7 +364,7 @@ class StreamingQueryManager private[sql] (
         .orElse(activeQueries.get(query.id)) // shouldn't be needed but 
paranoia ...
 
       val shouldStopActiveRun =
-        sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+        
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
       if (activeOption.isDefined) {
         if (shouldStopActiveRun) {
           val oldQuery = activeOption.get
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 2fe6a83427bc..e44bd5de4f4c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -52,7 +52,7 @@ class FileBasedDataSourceSuite extends QueryTest
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
   }
 
   override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
index 48a16f01d574..6cd8ade41da1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
@@ -225,7 +225,7 @@ class TPCDSTables(spark: SparkSession, dsdgenDir: String, 
scaleFactor: Int)
           // datagen speed files will be truncated to maxRecordsPerFile value, 
so the final
           // result will be the same.
           val numRows = data.count()
-          val maxRecordPerFile = spark.conf.get(SQLConf.MAX_RECORDS_PER_FILE)
+          val maxRecordPerFile = 
spark.sessionState.conf.getConf(SQLConf.MAX_RECORDS_PER_FILE)
 
           if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
             val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index fcb937d82ba4..0f5582def82d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -597,10 +597,10 @@ class JoinSuite extends QueryTest with SharedSparkSession 
with AdaptiveSparkPlan
       SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
 
       assert(statisticSizeInByte(spark.table("testData2")) >
-        spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+        sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
 
       assert(statisticSizeInByte(spark.table("testData")) <
-        spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+        sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
 
       Seq(
         ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
index 405213072081..352197f96acb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
@@ -19,12 +19,13 @@ package org.apache.spark.sql
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.config
+import org.apache.spark.sql.internal.RuntimeConfigImpl
 import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
 import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
 
 class RuntimeConfigSuite extends SparkFunSuite {
 
-  private def newConf(): RuntimeConfig = new RuntimeConfig
+  private def newConf(): RuntimeConfig = new RuntimeConfigImpl()
 
   test("set and get") {
     val conf = newConf()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9beceda26379..ce88f7dc475d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2568,20 +2568,21 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
       Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
 
       val newSession = spark.newSession()
+      val newSqlConf = newSession.sessionState.conf
       val originalValue = newSession.sessionState.conf.runSQLonFile
 
       try {
-        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false)
+        newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
         intercept[AnalysisException] {
           newSession.sql(s"SELECT i, j FROM 
parquet.`${path.getCanonicalPath}`")
         }
 
-        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true)
+        newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
         checkAnswer(
           newSession.sql(s"SELECT i, j FROM 
parquet.`${path.getCanonicalPath}`"),
           Row(1, "a"))
       } finally {
-        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue)
+        newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 4ac05373e5a3..d3117ec411fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -201,10 +201,10 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
       .getOrCreate()
 
     assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
-    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31234")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31234")
     session.sql("RESET")
     assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
-    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31234")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31234")
   }
 
   test("SPARK-31354: SparkContext only register one SparkSession 
ApplicationEnd listener") {
@@ -244,8 +244,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
       .builder()
       .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
       .getOrCreate()
-    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31532")
-    assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31532")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31532")
+    assert(session1.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31532")
 
     // do not propagate static sql configs to the existing default session
     SparkSession.clearActiveSession()
@@ -255,9 +255,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
       .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
       .getOrCreate()
 
-    assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
-    assert(session.conf.get(WAREHOUSE_PATH) === 
session2.conf.get(WAREHOUSE_PATH))
-    assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31532")
+    assert(!session.conf.get(WAREHOUSE_PATH.key).contains("SPARK-31532-db"))
+    assert(session.conf.get(WAREHOUSE_PATH.key) === 
session2.conf.get(WAREHOUSE_PATH.key))
+    assert(session2.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31532")
   }
 
   test("SPARK-31532: propagate static sql configs if no existing 
SparkSession") {
@@ -275,8 +275,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
       .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
       .getOrCreate()
     assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
-    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31532-2")
-    assert(session.conf.get(WAREHOUSE_PATH) contains "SPARK-31532-db-2")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) === 
"globalTempDB-SPARK-31532-2")
+    assert(session.conf.get(WAREHOUSE_PATH.key) contains "SPARK-31532-db-2")
   }
 
   test("SPARK-32062: reset listenerRegistered in SparkSession") {
@@ -461,7 +461,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
       val expected = 
path.getFileSystem(hadoopConf).makeQualified(path).toString
       // session related configs
       assert(hadoopConf.get("hive.metastore.warehouse.dir") === expected)
-      assert(session.conf.get(WAREHOUSE_PATH) === expected)
+      assert(session.conf.get(WAREHOUSE_PATH.key) === expected)
       assert(session.sessionState.conf.warehousePath === expected)
 
       // shared configs
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 322210bf5b59..ba87028a7147 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -178,7 +178,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with 
SQLHelper with Adapt
         MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))
     }
     withSession(extensions) { session =>
-      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true)
       assert(session.sessionState.adaptiveRulesHolder.queryStagePrepRules
         .contains(MyQueryStagePrepRule()))
       assert(session.sessionState.columnarRules.contains(
@@ -221,7 +221,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with 
SQLHelper with Adapt
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
     }
     withSession(extensions) { session =>
-      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true)
       session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
       assert(session.sessionState.columnarRules.contains(
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
@@ -280,7 +280,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with 
SQLHelper with Adapt
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
     }
     withSession(extensions) { session =>
-      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
+      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, enableAQE)
       assert(session.sessionState.columnarRules.contains(
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
       import session.implicits._
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index ef8b66566f24..7fa29dd38fd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -366,7 +366,7 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
       val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
       assert(stats.sizeInBytes > 0, "non-empty partitioned table should not 
report zero size.")
 
-      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") {
         sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
         sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
         val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
@@ -381,7 +381,7 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
       // Test data source table
       checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
       // Test hive serde table
-      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") {
         checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5df7b62cfb28..7aaec6d500ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2481,7 +2481,7 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("global temp view should not be masked by v2 catalog") {
-    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
     registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
 
     try {
@@ -2495,7 +2495,7 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-30104: global temp db is used as a table name under v2 catalog") 
{
-    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
     val t = s"testcat.$globalTempDB"
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
@@ -2506,7 +2506,7 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("SPARK-30104: v2 catalog named global_temp will be masked") {
-    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
     registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
     checkError(
       exception = intercept[AnalysisException] {
@@ -2712,7 +2712,7 @@ class DataSourceV2SQLSuiteV1Filter
       parameters = Map("relationName" -> "`testcat`.`abc`"),
       context = ExpectedContext(fragment = "testcat.abc", start = 17, stop = 
27))
 
-    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
     registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
     withTempView("v") {
       sql("create global temp view v as select 1")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index dc72b4a092ae..9ed4f1a006b2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -317,7 +317,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with SQLConfHelper {
       import spark.implicits._
       spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB")
       spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
-      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0")
       val df00 = spark.range(0, 1000, 2)
         .selectExpr("id as key", "id as value")
         .union(Seq.fill(100000)((600, 600)).toDF("key", "value"))
@@ -345,7 +345,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with SQLConfHelper {
       import spark.implicits._
       spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
       spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "100B")
-      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+      spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0")
       val df00 = spark.range(0, 10, 2)
         .selectExpr("id as key", "id as value")
         .union(Seq.fill(1000)((600, 600)).toDF("key", "value"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index ad755bf22ab0..0ba55382cd9a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -150,7 +150,7 @@ class InMemoryColumnarQuerySuite extends QueryTest
       spark.catalog.cacheTable("sizeTst")
       assert(
         spark.table("sizeTst").queryExecution.analyzed.stats.sizeInBytes >
-          spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+          sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 885286843a14..88ff51d0ff4c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -27,9 +27,9 @@ class PartitionBatchPruningSuite extends SharedSparkSession 
with AdaptiveSparkPl
 
   import testImplicits._
 
-  private lazy val originalColumnBatchSize = 
spark.conf.get(SQLConf.COLUMN_BATCH_SIZE)
+  private lazy val originalColumnBatchSize = 
spark.conf.get(SQLConf.COLUMN_BATCH_SIZE.key)
   private lazy val originalInMemoryPartitionPruning =
-    spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING)
+    spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING.key)
   private val testArrayData = (1 to 100).map { key =>
     Tuple1(Array.fill(key)(key))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
index fefb16a351fd..c798196c4f0e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
@@ -101,7 +101,7 @@ class OrcReadSchemaSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED)
+    originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED)
     spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
   }
 
@@ -126,7 +126,7 @@ class VectorizedOrcReadSchemaSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED)
+    originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED)
     spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "true")
   }
 
@@ -169,7 +169,7 @@ class ParquetReadSchemaSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
     spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
   }
 
@@ -193,7 +193,7 @@ class VectorizedParquetReadSchemaSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
     spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
   }
 
@@ -217,7 +217,7 @@ class MergedParquetReadSchemaSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConf = spark.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)
+    originalConf = sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)
     spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, "true")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 3dec1b9ff5cf..deb62eb3ac23 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -346,7 +346,7 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("fail fast and do not attempt to read if a file is too big") {
-    assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
+    assert(sqlConf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
     withTempPath { file =>
       val path = file.getPath
       val content = "123".getBytes
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 48b4f8d4bc01..b8669ee4d1ef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -63,7 +63,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest 
with BeforeAndAfter
 
   protected override def beforeAll(): Unit = {
     super.beforeAll()
-    originalConfORCImplementation = spark.conf.get(ORC_IMPLEMENTATION)
+    originalConfORCImplementation = 
spark.sessionState.conf.getConf(ORC_IMPLEMENTATION)
     spark.conf.set(ORC_IMPLEMENTATION.key, orcImp)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6c2f5a2d134d..0afa545595c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -846,7 +846,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
       withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
-          
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT))
 {
+          
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key).toUpperCase(Locale.ROOT))
 {
             compressionCodecFor(path, codec.name())
           }
         }
@@ -855,7 +855,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
     // Checks default compression codec
     checkCompressionCodec(
-      
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
+      
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key)))
 
     
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
index 2858d356d4c9..4833b8630134 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
@@ -58,7 +58,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends 
StateDataSourceTestB
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED, false)
+    spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key, false)
     spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key, 
newStateStoreProvider().getClass.getName)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 97c88037a717..af0770756950 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -942,7 +942,7 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
         // skip version and operator ID to test out functionalities
         .load()
 
-      val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+      val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
 
       val resultDf = stateReadDf
         .selectExpr("key.value AS key_value", "value.count AS value_count", 
"partition_id")
@@ -966,7 +966,7 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
   }
 
   test("partition_id column with stream-stream join") {
-    val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+    val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
 
     withTempDir { tempDir =>
       runStreamStreamJoinQueryWithOneThousandInputs(tempDir.getAbsolutePath)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 8fcd6edf1abb..d20cfb04f8e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -119,7 +119,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
 
   private def getFormatVersion(query: StreamingQuery): Int = {
     
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession
-      .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
+      .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
   }
 
   testWithColumnFamilies("SPARK-36519: store RocksDB format version in the 
checkpoint",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 7ac574db98d4..691f18451af2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -34,7 +34,7 @@ import org.rocksdb.CompressionType
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager, 
FileSystemBasedCheckpointFileManager}
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream,
 RenameBasedFSDataOutputStream}
@@ -167,7 +167,10 @@ trait AlsoTestWithChangelogCheckpointingEnabled
 @SlowSQLTest
 class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with 
SharedSparkSession {
 
-  sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, 
classOf[RocksDBStateStoreProvider].getName)
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(SQLConf.STATE_STORE_PROVIDER_CLASS, 
classOf[RocksDBStateStoreProvider].getName)
+  }
 
   testWithColumnFamilies(
     "RocksDB: check changelog and snapshot version",
@@ -2157,9 +2160,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
-  private def sqlConf = SQLConf.get.clone()
-
-  private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
+  private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
 
   def withDB[T](
       remoteDir: String,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 898aeec22ad1..6eff610433c9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -243,7 +243,7 @@ class ExpressionInfoSuite extends SparkFunSuite with 
SharedSparkSession {
       // Examples can change settings. We clone the session to prevent tests 
clashing.
       val clonedSpark = spark.cloneSession()
       // Coalescing partitions can change result order, so disable it.
-      clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
+      clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, false)
       val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)
       val className = info.getClassName
       if (!ignoreSet.contains(className)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index d0d4dc6b344f..82795e551b6b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -47,7 +47,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     // Set a conf first.
     spark.conf.set(testKey, testVal)
     // Clear the conf.
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
     // After clear, only overrideConfs used by unit test should be in the 
SQLConf.
     assert(spark.conf.getAll === TestSQLContext.overrideConfs)
 
@@ -62,11 +62,11 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
     assert(spark.conf.getAll.contains(testKey))
 
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
   }
 
   test("parse SQL set commands") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
     sql(s"set $testKey=$testVal")
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
     assert(spark.conf.get(testKey, testVal + "_") === testVal)
@@ -84,11 +84,11 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     sql(s"set $key=")
     assert(spark.conf.get(key, "0") === "")
 
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
   }
 
   test("set command for display") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
     checkAnswer(
       sql("SET").where("key = 'spark.sql.groupByOrdinal'").select("key", 
"value"),
       Nil)
@@ -109,11 +109,11 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("deprecated property") {
-    spark.sessionState.conf.clear()
-    val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+    sqlConf.clear()
+    val original = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
     try {
       sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
-      assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10)
+      assert(sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) === 10)
     } finally {
       sql(s"set ${SQLConf.SHUFFLE_PARTITIONS.key}=$original")
     }
@@ -146,18 +146,18 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("reset - public conf") {
-    spark.sessionState.conf.clear()
-    val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+    sqlConf.clear()
+    val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)
     try {
-      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
+      assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL))
       sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
-      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
+      assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === false)
       assert(sql(s"set").where(s"key = 
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
-      assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
+      assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
       sql(s"reset")
-      assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
+      assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL))
       assert(sql(s"set").where(s"key = 
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
-      assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
+      assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
         Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
     } finally {
       sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=$original")
@@ -165,15 +165,15 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("reset - internal conf") {
-    spark.sessionState.conf.clear()
-    val original = spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS)
+    sqlConf.clear()
+    val original = sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS)
     try {
-      assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+      assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
       sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=10")
-      assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
+      assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
       assert(sql(s"set").where(s"key = 
'${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 1)
       sql(s"reset")
-      assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+      assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
       assert(sql(s"set").where(s"key = 
'${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 0)
     } finally {
       sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=$original")
@@ -181,7 +181,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("reset - user-defined conf") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
     val userDefinedConf = "x.y.z.reset"
     try {
       assert(spark.conf.getOption(userDefinedConf).isEmpty)
@@ -196,7 +196,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("SPARK-32406: reset - single configuration") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
     // spark core conf w/o entry registered
     val appId = spark.sparkContext.getConf.getAppId
     sql("RESET spark.app.id")
@@ -216,19 +216,19 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     sql("RESET spark.abc") // ignore nonexistent keys
 
     // runtime sql configs
-    val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+    val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)
     sql(s"SET ${SQLConf.GROUP_BY_ORDINAL.key}=false")
     sql(s"RESET ${SQLConf.GROUP_BY_ORDINAL.key}")
-    assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === original)
+    assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === original)
 
     // runtime sql configs with optional defaults
-    assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
+    assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
     sql(s"RESET ${SQLConf.OPTIMIZER_EXCLUDED_RULES.key}")
-    assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
+    assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
       Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
     sql(s"SET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}=abc")
     sql(s"RESET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}")
-    assert(spark.conf.get(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty)
+    assert(sqlConf.getConf(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty)
 
     // static sql configs
     checkError(
@@ -247,19 +247,19 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
 
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
+    assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
 
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
+    assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
 
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1048576)
+    assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1048576)
 
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1073741824)
+    assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1073741824)
 
     // test negative value
     intercept[IllegalArgumentException] {
@@ -277,7 +277,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
       spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, 
"-90000000000g")
     }
 
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
   }
 
   test("SparkSession can access configs set in SparkConf") {
@@ -305,7 +305,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     try {
       sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a")
       val newSession = new SparkSession(sparkContext)
-      assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a")
+      assert(newSession.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) == "a")
       checkAnswer(
         newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"),
         Row(GLOBAL_TEMP_DATABASE.key, "a"))
@@ -338,16 +338,16 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") {
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
 
     // check default value
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.INT96)
 
-    spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+    sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
-    spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+    sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.INT96)
 
@@ -356,7 +356,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
       spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid")
     }
 
-    spark.sessionState.conf.clear()
+    sqlConf.clear()
   }
 
   test("SPARK-22779: correctly compute default value for fallback configs") {
@@ -373,10 +373,10 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
       .get
     assert(displayValue === fallback.defaultValueString)
 
-    spark.conf.set(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName())
+    sqlConf.setConf(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName())
     assert(spark.conf.get(fallback.key) === GZIP.lowerCaseName())
 
-    spark.conf.set(fallback, LZO.lowerCaseName())
+    sqlConf.setConf(fallback, LZO.lowerCaseName())
     assert(spark.conf.get(fallback.key) === LZO.lowerCaseName())
 
     val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
@@ -459,10 +459,10 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   test("set time zone") {
     TimeZone.getAvailableIDs().foreach { zid =>
       sql(s"set time zone '$zid'")
-      assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) === zid)
+      assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) === zid)
     }
     sql("set time zone local")
-    assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) === 
TimeZone.getDefault.getID)
+    assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) === 
TimeZone.getDefault.getID)
 
     val tz = "Invalid TZ"
     checkError(
@@ -476,7 +476,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
 
     (-18 to 18).map(v => (v, s"interval '$v' hours")).foreach { case (i, 
interval) =>
       sql(s"set time zone $interval")
-      val zone = spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE)
+      val zone = sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
       if (i == 0) {
         assert(zone === "Z")
       } else {
@@ -504,7 +504,7 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
   test("SPARK-47765: set collation") {
     Seq("UNICODE", "UNICODE_CI", "utf8_lcase", "utf8_binary").foreach { 
collation =>
       sql(s"set collation $collation")
-      assert(spark.conf.get(SQLConf.DEFAULT_COLLATION) === 
collation.toUpperCase(Locale.ROOT))
+      assert(sqlConf.getConf(SQLConf.DEFAULT_COLLATION) === 
collation.toUpperCase(Locale.ROOT))
     }
 
     checkError(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 8b11e0c69fa7..24732223c669 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -54,7 +54,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
 
   protected override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
+    spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING.key, 
true)
   }
 
   protected override def afterAll(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 168b6b862992..e27ec32e287e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -50,7 +50,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
   }
 
   override def afterAll(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 56c4aecb2377..773be0cc08e3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -262,7 +262,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
   }
 
   override def afterAll(): Unit = {
@@ -1504,7 +1504,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
     // This is to avoid running a spark job to list of files in parallel
     // by the InMemoryFileIndex.
-    spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 
2)
+    spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key, 
numFiles * 2)
 
     withTempDirs { case (root, tmp) =>
       val src = new File(root, "a=1")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index f3ef73c6af5f..f7ff39622ed4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -1163,7 +1163,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
       func: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int],
       timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout,
       batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = {
-    val stateFormatVersion = 
spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
+    val stateFormatVersion = 
sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
     val emptyRdd = spark.sparkContext.emptyRDD[InternalRow]
     MemoryStream[Int]
       .toDS()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7ab45e25799b..68436c4e355b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -542,10 +542,7 @@ trait StreamTest extends QueryTest with SharedSparkSession 
with TimeLimits with
           val metadataRoot = 
Option(checkpointLocation).getOrElse(defaultCheckpointLocation)
 
           additionalConfs.foreach(pair => {
-            val value =
-              if (sparkSession.conf.contains(pair._1)) {
-                Some(sparkSession.conf.get(pair._1))
-              } else None
+            val value = sparkSession.conf.getOption(pair._1)
             resetConfValues(pair._1) = value
             sparkSession.conf.set(pair._1, pair._2)
           })
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
index defd5fd110de..a47c2f839692 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
@@ -265,7 +265,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest 
{
   private def assertQueryUsingRightBatchExecutor(
       testSource: TestDataFrameProvider,
       query: StreamingQuery): Unit = {
-    val useWrapper = query.sparkSession.conf.get(
+    val useWrapper = query.sparkSession.sessionState.conf.getConf(
       SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)
 
     if (useWrapper) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ff1473fea369..4d4cc44eb3e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -103,6 +103,8 @@ trait SharedSparkSessionBase
     new TestSparkSession(sparkConf)
   }
 
+  protected def sqlConf: SQLConf = _spark.sessionState.conf
+
   /**
    * Initialize the [[TestSparkSession]].  Generally, this is just called from
    * beforeAll; however, in test using styles other than FunSuite, there is
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index d84b9f796023..8c6113fb5569 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -86,7 +86,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
     
assert(ss2.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") 
!==
       invalidPath, "warehouse conf in session options can't affect application 
wide hadoop conf")
     assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should 
be passed to catalog")
-    assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath),
+    assert(!ss.conf.get(WAREHOUSE_PATH.key).contains(invalidPath),
       "session level conf should be passed to catalog")
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 69abb1d1673e..865ce81e151c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -828,7 +828,7 @@ object SPARK_18360 {
       .enableHiveSupport().getOrCreate()
 
     val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
-    assert(new Path(defaultDbLocation) == new 
Path(spark.conf.get(WAREHOUSE_PATH)))
+    assert(new Path(defaultDbLocation) == new 
Path(spark.conf.get(WAREHOUSE_PATH.key)))
 
     val hiveClient =
       
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
index aafc4764d246..1922144a92ef 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
@@ -44,7 +44,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with 
SQLTestUtils with TestHiveS
     super.beforeAll()
     originalConvertMetastoreParquet = 
spark.conf.get(CONVERT_METASTORE_PARQUET.key)
     originalConvertMetastoreORC = spark.conf.get(CONVERT_METASTORE_ORC.key)
-    originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION)
+    originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION.key)
 
     spark.conf.set(CONVERT_METASTORE_PARQUET.key, "false")
     spark.conf.set(CONVERT_METASTORE_ORC.key, "false")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 3deb355e0e4a..594c097de2c7 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -79,13 +79,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
   test("query global temp view") {
     val df = Seq(1).toDF("i1")
     df.createGlobalTempView("tbl1")
-    val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE)
+    val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key)
     checkAnswer(spark.sql(s"select * from ${global_temp_db}.tbl1"), Row(1))
     spark.sql(s"drop view ${global_temp_db}.tbl1")
   }
 
   test("non-existent global temp view") {
-    val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE)
+    val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key)
     val e = intercept[AnalysisException] {
       spark.sql(s"select * from ${global_temp_db}.nonexistentview")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to