This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e832a20bd0 [SPARK-46118][SQL][SS][CONNECT] Use 
`SparkSession.sessionState.conf` instead of `SQLContext.conf` and mark 
`SQLContext.conf` as deprecated
92e832a20bd0 is described below

commit 92e832a20bd013af1c6820152d6e011353545f1c
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Nov 28 00:43:22 2023 +0800

    [SPARK-46118][SQL][SS][CONNECT] Use `SparkSession.sessionState.conf` 
instead of `SQLContext.conf` and mark `SQLContext.conf` as deprecated
    
    ### What changes were proposed in this pull request?
    There are some calls to `SparkSession.sqlContext.conf` in the Spark code, 
which is equivalent to 
`SparkSession.sqlContext.sparkSession.sessionState.conf`. This PR changes them 
to directly call `SparkSession.sessionState.conf` or expand to 
`SQLContext.SparkSession.sessionState.conf`
    
    At the same time, this PR marks the internal API `SQLContext.conf` as 
deprecated, and `SparkSession.sessionState.conf` should be used directly.
    
    ### Why are the changes needed?
    1. `SparkSession.sessionState.conf` has a shallower call stack compared to 
`SparkSession.sqlContext.conf`
    2. `SQLContext` has been marked as deprecated since Apache Spark 1.6, and 
its APIs should be avoided as much as possible in Spark's internal code.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44034 from LuciferYang/sc-conf.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala      |  2 +-
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala   |  4 ++--
 .../src/main/scala/org/apache/spark/sql/SQLContext.scala     |  1 +
 .../spark/sql/execution/command/createDataSourceTables.scala |  2 +-
 .../scala/org/apache/spark/sql/execution/command/ddl.scala   |  4 ++--
 .../apache/spark/sql/execution/datasources/DataSource.scala  |  2 +-
 .../spark/sql/execution/datasources/FileStatusCache.scala    |  8 ++++----
 .../spark/sql/execution/datasources/HadoopFsRelation.scala   |  2 +-
 .../execution/datasources/PartitioningAwareFileIndex.scala   |  4 ++--
 .../execution/datasources/jdbc/JdbcRelationProvider.scala    |  9 +++++----
 .../spark/sql/execution/streaming/MicroBatchExecution.scala  |  6 +++---
 .../spark/sql/execution/streaming/ProgressReporter.scala     |  2 +-
 .../execution/streaming/continuous/EpochCoordinator.scala    |  2 +-
 .../main/scala/org/apache/spark/sql/sources/interfaces.scala |  2 +-
 .../org/apache/spark/sql/streaming/DataStreamReader.scala    |  2 +-
 .../org/apache/spark/sql/streaming/DataStreamWriter.scala    |  2 +-
 .../execution/datasources/parquet/ParquetEncodingSuite.scala |  6 +++---
 .../sql/execution/datasources/parquet/ParquetIOSuite.scala   | 10 +++++-----
 .../streaming/sources/RatePerMicroBatchProviderSuite.scala   |  2 +-
 .../streaming/sources/RateStreamProviderSuite.scala          |  4 ++--
 .../execution/streaming/sources/TextSocketStreamSuite.scala  |  2 +-
 .../streaming/state/SymmetricHashJoinStateManagerSuite.scala |  2 +-
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala |  2 +-
 .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala     |  2 +-
 .../org/apache/spark/sql/sources/BucketedReadSuite.scala     |  2 +-
 .../scala/org/apache/spark/sql/streaming/StreamSuite.scala   |  2 +-
 .../spark/sql/streaming/StreamingAggregationSuite.scala      |  2 +-
 .../org/apache/spark/sql/streaming/StreamingJoinSuite.scala  |  2 +-
 .../continuous/ContinuousQueuedDataReaderSuite.scala         |  4 ++--
 .../sql/streaming/sources/StreamingDataSourceV2Suite.scala   |  4 ++--
 .../hive/thriftserver/SparkExecuteStatementOperation.scala   | 12 +++++++-----
 .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala      | 10 ++++++----
 .../apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala  |  2 +-
 .../spark/sql/hive/thriftserver/SparkSQLSessionManager.scala |  2 +-
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala     |  2 +-
 .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala |  4 +++-
 36 files changed, 70 insertions(+), 62 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 95c5acc803d4..3ac093b5e0b4 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2235,7 +2235,7 @@ class SparkConnectPlanner(
 
     JoinWith.typedJoinWith(
       joined,
-      session.sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity,
+      session.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity,
       session.sessionState.analyzer.resolver,
       rel.getJoinDataType.getIsLeftStruct,
       rel.getJoinDataType.getIsRightStruct)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d36aaef55866..0c33f2c87fec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1244,7 +1244,7 @@ class Dataset[T] private[sql](
 
       withTypedPlan(JoinWith.typedJoinWith(
         joined,
-        sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity,
+        sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity,
         sparkSession.sessionState.analyzer.resolver,
         this.exprEnc.isSerializedAsStructForTopLevel,
         other.exprEnc.isSerializedAsStructForTopLevel))
@@ -1450,7 +1450,7 @@ class Dataset[T] private[sql](
     case "*" =>
       Column(ResolvedStar(queryExecution.analyzed.output))
     case _ =>
-      if (sqlContext.conf.supportQuotedRegexColumnName) {
+      if (sparkSession.sessionState.conf.supportQuotedRegexColumnName) {
         colRegex(colName)
       } else {
         Column(addDataFrameIdToCol(resolve(colName)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a52de12e70c4..267581659d87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -78,6 +78,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)
 
   private[sql] def sessionState: SessionState = sparkSession.sessionState
   private[sql] def sharedState: SharedState = sparkSession.sharedState
+  @deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
   private[sql] def conf: SQLConf = sessionState.conf
 
   def sparkContext: SparkContext = sparkSession.sparkContext
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a94140dae5c0..a8ec810fab3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -194,7 +194,7 @@ case class CreateDataSourceTableAsSelectCommand(
 
       result match {
         case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
-            sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+            sparkSession.sessionState.conf.manageFilesourcePartitions =>
           // Need to recover partitions into the metastore so our saved data 
is visible.
           sessionState.executePlan(RepairTableCommand(
             table.identifier,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 130872b10bcd..7e001803592f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -715,7 +715,7 @@ case class RepairTableCommand(
       val total = partitionSpecsAndLocs.length
       logInfo(s"Found $total partitions in $root")
 
-      val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+      val partitionStats = if (spark.sessionState.conf.gatherFastStats) {
         gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, 
threshold)
       } else {
         Map.empty[Path, PartitionStatistics]
@@ -957,7 +957,7 @@ object DDLUtils extends Logging {
   def verifyPartitionProviderIsHive(
       spark: SparkSession, table: CatalogTable, action: String): Unit = {
     val tableName = table.identifier.table
-    if (!spark.sqlContext.conf.manageFilesourcePartitions && 
isDatasourceTable(table)) {
+    if (!spark.sessionState.conf.manageFilesourcePartitions && 
isDatasourceTable(table)) {
       throw QueryCompilationErrors
         
.actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, 
tableName)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 835308f3d024..cebc74af724d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -390,7 +390,7 @@ case class DataSource(
 
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
-        val useCatalogFileIndex = 
sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+        val useCatalogFileIndex = 
sparkSession.sessionState.conf.manageFilesourcePartitions &&
           catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog 
&&
           catalogTable.get.partitionColumnNames.nonEmpty
         val (fileCatalog, dataSchema, partitionSchema) = if 
(useCatalogFileIndex) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index e1fdb9570732..80002ecdaf8d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -41,12 +41,12 @@ object FileStatusCache {
    *         shared across all clients.
    */
   def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
-    if (session.sqlContext.conf.manageFilesourcePartitions &&
-      session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
+    if (session.sessionState.conf.manageFilesourcePartitions &&
+      session.sessionState.conf.filesourcePartitionFileCacheSize > 0) {
       if (sharedCache == null) {
         sharedCache = new SharedInMemoryCache(
-          session.sqlContext.conf.filesourcePartitionFileCacheSize,
-          session.sqlContext.conf.metadataCacheTTL
+          session.sessionState.conf.filesourcePartitionFileCacheSize,
+          session.sessionState.conf.metadataCacheTTL
         )
       }
       sharedCache.createForNewClient()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index fd1824055dcf..a87453d3fd53 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -65,7 +65,7 @@ case class HadoopFsRelation(
   }
 
   override def sizeInBytes: Long = {
-    val compressionFactor = sqlContext.conf.fileCompressionFactor
+    val compressionFactor = 
sparkSession.sessionState.conf.fileCompressionFactor
     (location.sizeInBytes * compressionFactor).toLong
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 37de04a59e4b..dc41afe226b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -157,8 +157,8 @@ abstract class PartitioningAwareFileIndex(
         typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
         basePaths = basePaths,
         userSpecifiedSchema = userSpecifiedSchema,
-        caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
-        validatePartitionColumns = 
sparkSession.sqlContext.conf.validatePartitionColumns,
+        caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis,
+        validatePartitionColumns = 
sparkSession.sessionState.conf.validatePartitionColumns,
         timeZoneId = timeZoneId)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 2760c7ac3019..d9be1a1e3f67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -32,11 +32,12 @@ class JdbcRelationProvider extends CreatableRelationProvider
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
     val jdbcOptions = new JDBCOptions(parameters)
-    val resolver = sqlContext.conf.resolver
-    val timeZoneId = sqlContext.conf.sessionLocalTimeZone
+    val sparkSession = sqlContext.sparkSession
+    val resolver = sparkSession.sessionState.conf.resolver
+    val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
     val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
     val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, 
jdbcOptions)
-    JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
+    JDBCRelation(schema, parts, jdbcOptions)(sparkSession)
   }
 
   override def createRelation(
@@ -45,7 +46,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
       parameters: Map[String, String],
       df: DataFrame): BaseRelation = {
     val options = new JdbcOptionsInWrite(parameters)
-    val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
+    val isCaseSensitive = 
sqlContext.sparkSession.sessionState.conf.caseSensitiveAnalysis
     val dialect = JdbcDialects.get(options.url)
     val conn = dialect.createConnectionFactory(options)(-1)
     try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 3febce0fa445..1bd59e818be5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -66,7 +66,7 @@ class MicroBatchExecution(
         // When the flag is disabled, Spark will fall back to single batch 
execution, whenever
         // it figures out any source does not support Trigger.AvailableNow.
         // See SPARK-45178 for more details.
-        if (sparkSession.sqlContext.conf.getConf(
+        if (sparkSession.sessionState.conf.getConf(
             SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) {
           logInfo("Configured to use the wrapper of Trigger.AvailableNow for 
query " +
             s"$prettyIdString.")
@@ -113,7 +113,7 @@ class MicroBatchExecution(
     // transformation is responsible for replacing attributes with their final 
values.
 
     val disabledSources =
-      
Utils.stringToSeq(sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders)
+      
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)
 
     import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
     val _logicalPlan = analyzedPlan.transform {
@@ -144,7 +144,7 @@ class MicroBatchExecution(
           })
         } else if (v1.isEmpty) {
           throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(
-            srcName, 
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders, table)
+            srcName, 
sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders, table)
         } else {
           v2ToExecutionRelationMap.getOrElseUpdate(s, {
             // Materialize source to avoid creating it in every batch
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index e70e94001eee..ffdf9da6e581 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -145,7 +145,7 @@ trait ProgressReporter extends Logging {
   private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
     progressBuffer.synchronized {
       progressBuffer += newProgress
-      while (progressBuffer.length >= 
sparkSession.sqlContext.conf.streamingProgressRetention) {
+      while (progressBuffer.length >= 
sparkSession.sessionState.conf.streamingProgressRetention) {
         progressBuffer.dequeue()
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index dbddab2e9acd..c1027db6ec77 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -124,7 +124,7 @@ private[continuous] class EpochCoordinator(
   extends ThreadSafeRpcEndpoint with Logging {
 
   private val epochBacklogQueueSize =
-    session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize
+    session.sessionState.conf.continuousStreamingEpochBacklogQueueSize
 
   private var queryWritesStopped: Boolean = false
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index d194ae77e968..2911dfae4622 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -206,7 +206,7 @@ abstract class BaseRelation {
    *
    * @since 1.3.0
    */
-  def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
+  def sizeInBytes: Long = 
sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes
 
   /**
    * Whether does it need to convert the objects in Row to internal 
representation, for example:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 36dd168992a1..905c96ff4cbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -156,7 +156,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
       extraOptions + ("path" -> path.get)
     }
 
-    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).
+    val ds = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf).
       getConstructor().newInstance()
     // We need to generate the V1 data source so we can pass it to the V2 
relation as a shim.
     // We can't be sure at this point whether we'll actually want to use V2, 
since we don't know the
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 036afa62b488..f4665f8ac677 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -369,7 +369,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
     } else {
       val cls = DataSource.lookupDataSource(source, 
df.sparkSession.sessionState.conf)
       val disabledSources =
-        
Utils.stringToSeq(df.sparkSession.sqlContext.conf.disabledV2StreamingWriters)
+        
Utils.stringToSeq(df.sparkSession.sessionState.conf.disabledV2StreamingWriters)
       val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
         // file source v2 does not support streaming yet.
         classOf[FileDataSourceV2].isAssignableFrom(cls)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index a0d11e2ce7ae..cd6f41b4ef45 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest 
with SharedSparkSess
         
List.fill(n)(ROW).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
         val file = TestUtils.listDirectory(dir).head
 
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file, null)
@@ -91,7 +91,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest 
with SharedSparkSess
         data.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = TestUtils.listDirectory(dir).head
 
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file, null)
@@ -125,7 +125,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest 
with SharedSparkSess
         data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath)
         val file = TestUtils.listDirectory(dir).head
 
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         reader.initialize(file, null /* set columns to null to project all 
columns */)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c064f49c3122..1efa8221e41f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1375,7 +1375,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
       
spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
       val file = TestUtils.listDirectory(dir).head;
       {
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         try {
@@ -1394,7 +1394,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
       // Project just one column
       {
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         try {
@@ -1412,7 +1412,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
       // Project columns in opposite order
       {
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         try {
@@ -1431,7 +1431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
       // Empty projection
       {
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val reader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         try {
@@ -1473,7 +1473,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
       dataTypes.zip(constantValues).foreach { case (dt, v) =>
         val schema = StructType(StructField("pcol", dt) :: Nil)
-        val conf = sqlContext.conf
+        val conf = spark.sessionState.conf
         val vectorizedReader = new VectorizedParquetRecordReader(
           conf.offHeapColumnVectorEnabled, 
conf.parquetVectorizedReaderBatchSize)
         val partitionValues = new GenericInternalRow(Array(v))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
index 48f90e34890c..01599bb92869 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -29,7 +29,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
   import testImplicits._
 
   test("RatePerMicroBatchProvider in registry") {
-    val ds = DataSource.lookupDataSource("rate-micro-batch", 
spark.sqlContext.conf)
+    val ds = DataSource.lookupDataSource("rate-micro-batch", 
spark.sessionState.conf)
       .getConstructor().newInstance()
     assert(ds.isInstanceOf[RatePerMicroBatchProvider], "Could not find 
rate-micro-batch source")
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 556782d9c554..051cf9e17b78 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -56,7 +56,7 @@ class RateStreamProviderSuite extends StreamTest {
   }
 
   test("RateStreamProvider in registry") {
-    val ds = DataSource.lookupDataSource("rate", spark.sqlContext.conf)
+    val ds = DataSource.lookupDataSource("rate", spark.sessionState.conf)
       .getConstructor().newInstance()
     assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
   }
@@ -64,7 +64,7 @@ class RateStreamProviderSuite extends StreamTest {
   test("compatible with old path in registry") {
     val ds = DataSource.lookupDataSource(
       "org.apache.spark.sql.execution.streaming.RateSourceProvider",
-      spark.sqlContext.conf).getConstructor().newInstance()
+      spark.sessionState.conf).getConstructor().newInstance()
     assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 92dd3a996801..06cb5be2add6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -87,7 +87,7 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSparkSession {
   test("backward compatibility with old path") {
     val ds = DataSource.lookupDataSource(
       "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider",
-      spark.sqlContext.conf).getConstructor().newInstance()
+      spark.sessionState.conf).getConstructor().newInstance()
     assert(ds.isInstanceOf[TextSocketSourceProvider], "Could not find socket 
source")
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index b0abcbbe4d02..16f3e972c769 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -317,7 +317,7 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
     withTempDir { file =>
       withSQLConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.key ->
         skipNullsForStreamStreamJoins.toString) {
-        val storeConf = new StateStoreConf(spark.sqlContext.conf)
+        val storeConf = new StateStoreConf(spark.sessionState.conf)
         val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath, 
UUID.randomUUID, 0, 0, 5)
         val manager = new SymmetricHashJoinStateManager(
           LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), 
storeConf, new Configuration,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index e759ef01e2c7..8655c0a3c29c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1329,7 +1329,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
     val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
     val schema = JdbcUtils.schemaString(
       df.schema,
-      df.sqlContext.conf.caseSensitiveAnalysis,
+      df.sparkSession.sessionState.conf.caseSensitiveAnalysis,
       "jdbc:mysql://localhost:3306/temp")
     assert(schema.contains("`order` LONGTEXT"))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index ccb202085910..f904d0e3d3c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -417,7 +417,7 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
 
       assert(JdbcUtils.schemaString(
         schema,
-        spark.sqlContext.conf.caseSensitiveAnalysis,
+        spark.sessionState.conf.caseSensitiveAnalysis,
         url1,
         Option(createTableColTypes)) == expectedSchemaStr)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 746f289c3932..898e80df0207 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -449,7 +449,7 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
           joined.sort("bucketed_table1.k", "bucketed_table2.k"),
           df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", 
"df2.k"))
 
-        val joinOperator = if 
(joined.sqlContext.conf.adaptiveExecutionEnabled) {
+        val joinOperator = if 
(joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
           val executedPlan =
             
joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
           assert(executedPlan.isInstanceOf[SortMergeJoinExec])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 953bbddf6abb..883f64ff7af4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -1317,7 +1317,7 @@ class StreamSuite extends StreamTest {
           .map(_.asInstanceOf[RepartitionByExpression].numPartitions)
         // Before the fix of SPARK-34482, the numPartition is the value of
         // `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`.
-        assert(numPartition.get === 
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
+        assert(numPartition.get === 
spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
       } finally {
         if (query != null) {
           query.stop()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index be84640f4bf3..8d79cf4af771 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -708,7 +708,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
       .groupBy("group")
       .agg(collect_list("value"))
     testStream(df, outputMode = OutputMode.Update)(
-      AddData(input, (1 to 
spark.sqlContext.conf.objectAggSortBasedFallbackThreshold): _*),
+      AddData(input, (1 to 
spark.sessionState.conf.objectAggSortBasedFallbackThreshold): _*),
       AssertOnQuery { q =>
         q.processAllAvailable()
         true
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 4692c685c80b..e05cb4d3c35c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -618,7 +618,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
           }
         }
 
-        val numPartitions = 
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
+        val numPartitions = 
spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)
 
         assert(query.lastExecution.executedPlan.collect {
           case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
index 55b884573f64..dac9e760e4be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
@@ -93,8 +93,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with 
MockitoSugar {
       partitionReader,
       new StructType().add("i", "int"),
       mockContext,
-      dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize,
-      epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs)
+      dataQueueSize = 
spark.sessionState.conf.continuousStreamingExecutorQueueSize,
+      epochPollIntervalMs = 
spark.sessionState.conf.continuousStreamingExecutorPollIntervalMs)
 
     (queue, reader)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 1a4862bf9781..e77ba92fe298 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -422,10 +422,10 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
   for ((read, write, trigger) <- cases) {
     testQuietly(s"stream with read format $read, write format $write, trigger 
$trigger") {
-      val sourceTable = DataSource.lookupDataSource(read, 
spark.sqlContext.conf).getConstructor()
+      val sourceTable = DataSource.lookupDataSource(read, 
spark.sessionState.conf).getConstructor()
         
.newInstance().asInstanceOf[SimpleTableProvider].getTable(CaseInsensitiveStringMap.empty())
 
-      val sinkTable = DataSource.lookupDataSource(write, 
spark.sqlContext.conf).getConstructor()
+      val sinkTable = DataSource.lookupDataSource(write, 
spark.sessionState.conf).getConstructor()
         
.newInstance().asInstanceOf[SimpleTableProvider].getTable(CaseInsensitiveStringMap.empty())
 
       import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e70d05820c34..f8c592a943a0 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -49,11 +49,13 @@ private[hive] class SparkExecuteStatementOperation(
   with SparkOperation
   with Logging {
 
+  val session = sqlContext.sparkSession
+
   // If a timeout value `queryTimeout` is specified by users and it is smaller 
than
   // a global timeout value, we use the user-specified value.
   // This code follows the Hive timeout behaviour (See #29933 for details).
   private val timeout = {
-    val globalTimeout = 
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
+    val globalTimeout = 
session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
     if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < 
queryTimeout)) {
       globalTimeout
     } else {
@@ -61,13 +63,13 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  private val forceCancel = 
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)
+  private val forceCancel = 
session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)
 
   private val redactedStatement = {
-    val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) {
+    val substitutorStatement = 
SQLConf.withExistingConf(session.sessionState.conf) {
       new VariableSubstitution().substitute(statement)
     }
-    SparkUtils.redact(sqlContext.conf.stringRedactionPattern, 
substitutorStatement)
+    SparkUtils.redact(session.sessionState.conf.stringRedactionPattern, 
substitutorStatement)
   }
 
   private var result: DataFrame = _
@@ -259,7 +261,7 @@ private[hive] class SparkExecuteStatementOperation(
           e match {
             case _: HiveSQLException => throw e
             case _ => throw HiveThriftServerErrors.runningQueryError(
-              e, sqlContext.conf.errorMessageFormat)
+              e, sqlContext.sparkSession.sessionState.conf.errorMessageFormat)
           }
         }
     } finally {
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 5b76cd653e37..73290a4d2592 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -284,7 +284,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     var prefix = ""
 
     def currentDB = {
-      if 
(!SparkSQLEnv.sqlContext.conf.getConf(LEGACY_EMPTY_CURRENT_DB_IN_CLI)) {
+      if (!SparkSQLEnv.sqlContext.sparkSession.sessionState.conf
+        .getConf(LEGACY_EMPTY_CURRENT_DB_IN_CLI)) {
         s" (${SparkSQLEnv.sqlContext.sparkSession.catalog.currentDatabase})"
       } else {
         ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb",
@@ -448,7 +449,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
   }
 
   override def setHiveVariables(hiveVariables: java.util.Map[String, String]): 
Unit = {
-    hiveVariables.asScala.foreach(kv => 
SparkSQLEnv.sqlContext.conf.setConfString(kv._1, kv._2))
+    hiveVariables.asScala.foreach(kv =>
+      
SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.setConfString(kv._1, 
kv._2))
   }
 
   def printMasterAndAppId(): Unit = {
@@ -504,7 +506,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
 
           ret = rc.getResponseCode
           if (ret != 0) {
-            val format = SparkSQLEnv.sqlContext.conf.errorMessageFormat
+            val format = 
SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.errorMessageFormat
             val e = rc.getException
             val msg = e match {
               case st: SparkThrowable with Throwable => 
SparkThrowableHelper.getMessage(st, format)
@@ -523,7 +525,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver 
with Logging {
           val res = new JArrayList[String]()
 
           if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER) ||
-              SparkSQLEnv.sqlContext.conf.cliPrintHeader) {
+              
SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.cliPrintHeader) {
             // Print the column names.
             Option(driver.getSchema.getFieldSchemas).foreach { fields =>
               out.println(fields.asScala.map(_.getName).mkString("\t"))
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 4834956f478d..5d9ec3051dc3 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -61,7 +61,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = 
SparkSQLEnv.sqlCont
 
   override def run(command: String): CommandProcessorResponse = {
     try {
-      val substitutorCommand = SQLConf.withExistingConf(context.conf) {
+      val substitutorCommand = 
SQLConf.withExistingConf(context.sparkSession.sessionState.conf) {
         new VariableSubstitution().substitute(command)
       }
       context.sparkContext.setJobDescription(substitutorCommand)
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index b6528ac62419..7acc485b01e5 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -58,7 +58,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: 
HiveServer2, sqlContext:
       val session = super.getSession(sessionHandle)
       HiveThriftServer2.eventManager.onSessionCreated(
         session.getIpAddress, sessionHandle.getSessionId.toString, 
session.getUsername)
-      val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
+      val ctx = if 
(sqlContext.sparkSession.sessionState.conf.hiveThriftServerSingleSession) {
         sqlContext
       } else {
         sqlContext.newSession()
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 08f69aecdd2e..e7d03b82274c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -194,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     val tableIdentifier =
       QualifiedTableName(relation.tableMeta.database, 
relation.tableMeta.identifier.table)
 
-    val lazyPruningEnabled = 
sparkSession.sqlContext.conf.manageFilesourcePartitions
+    val lazyPruningEnabled = 
sparkSession.sessionState.conf.manageFilesourcePartitions
     val tablePath = new Path(relation.tableMeta.location)
     val fileFormat = fileFormatClass.getConstructor().newInstance()
     val bucketSpec = relation.tableMeta.bucketSpec
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a9861dafda72..d4847ee830f5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -70,7 +70,9 @@ object TestHive
         // LocalRelation will exercise the optimization rules better by 
disabling it as
         // this rule may potentially block testing of other optimization rules 
such as
         // ConstantPropagation etc.
-        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName)))
+        .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, 
ConvertToLocalRelation.ruleName))) {
+  override def conf: SQLConf = sparkSession.sessionState.conf
+}
 
 
 case class TestHiveVersion(hiveClient: HiveClient)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to