This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 92e832a20bd0 [SPARK-46118][SQL][SS][CONNECT] Use `SparkSession.sessionState.conf` instead of `SQLContext.conf` and mark `SQLContext.conf` as deprecated 92e832a20bd0 is described below commit 92e832a20bd013af1c6820152d6e011353545f1c Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Nov 28 00:43:22 2023 +0800 [SPARK-46118][SQL][SS][CONNECT] Use `SparkSession.sessionState.conf` instead of `SQLContext.conf` and mark `SQLContext.conf` as deprecated ### What changes were proposed in this pull request? There are some calls to `SparkSession.sqlContext.conf` in the Spark code, which is equivalent to `SparkSession.sqlContext.sparkSession.sessionState.conf`. This PR changes them to directly call `SparkSession.sessionState.conf` or expand to `SQLContext.SparkSession.sessionState.conf` At the same time, this PR marks the internal API `SQLContext.conf` as deprecated, and `SparkSession.sessionState.conf` should be used directly. ### Why are the changes needed? 1. `SparkSession.sessionState.conf` has a shallower call stack compared to `SparkSession.sqlContext.conf` 2. `SQLContext` has been marked as deprecated since Apache Spark 1.6, and its APIs should be avoided as much as possible in Spark's internal code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44034 from LuciferYang/sc-conf. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 1 + .../spark/sql/execution/command/createDataSourceTables.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 4 ++-- .../apache/spark/sql/execution/datasources/DataSource.scala | 2 +- .../spark/sql/execution/datasources/FileStatusCache.scala | 8 ++++---- .../spark/sql/execution/datasources/HadoopFsRelation.scala | 2 +- .../execution/datasources/PartitioningAwareFileIndex.scala | 4 ++-- .../execution/datasources/jdbc/JdbcRelationProvider.scala | 9 +++++---- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 6 +++--- .../spark/sql/execution/streaming/ProgressReporter.scala | 2 +- .../execution/streaming/continuous/EpochCoordinator.scala | 2 +- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 2 +- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 2 +- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 +- .../execution/datasources/parquet/ParquetEncodingSuite.scala | 6 +++--- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 10 +++++----- .../streaming/sources/RatePerMicroBatchProviderSuite.scala | 2 +- .../streaming/sources/RateStreamProviderSuite.scala | 4 ++-- .../execution/streaming/sources/TextSocketStreamSuite.scala | 2 +- .../streaming/state/SymmetricHashJoinStateManagerSuite.scala | 2 +- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 2 +- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- .../spark/sql/streaming/StreamingAggregationSuite.scala | 2 +- .../org/apache/spark/sql/streaming/StreamingJoinSuite.scala | 2 +- .../continuous/ContinuousQueuedDataReaderSuite.scala | 4 ++-- .../sql/streaming/sources/StreamingDataSourceV2Suite.scala | 4 ++-- .../hive/thriftserver/SparkExecuteStatementOperation.scala | 12 +++++++----- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 10 ++++++---- .../apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala | 2 +- .../spark/sql/hive/thriftserver/SparkSQLSessionManager.scala | 2 +- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 4 +++- 36 files changed, 70 insertions(+), 62 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 95c5acc803d4..3ac093b5e0b4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2235,7 +2235,7 @@ class SparkConnectPlanner( JoinWith.typedJoinWith( joined, - session.sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity, + session.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, session.sessionState.analyzer.resolver, rel.getJoinDataType.getIsLeftStruct, rel.getJoinDataType.getIsRightStruct) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d36aaef55866..0c33f2c87fec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1244,7 +1244,7 @@ class Dataset[T] private[sql]( withTypedPlan(JoinWith.typedJoinWith( joined, - sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity, + sparkSession.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, sparkSession.sessionState.analyzer.resolver, this.exprEnc.isSerializedAsStructForTopLevel, other.exprEnc.isSerializedAsStructForTopLevel)) @@ -1450,7 +1450,7 @@ class Dataset[T] private[sql]( case "*" => Column(ResolvedStar(queryExecution.analyzed.output)) case _ => - if (sqlContext.conf.supportQuotedRegexColumnName) { + if (sparkSession.sessionState.conf.supportQuotedRegexColumnName) { colRegex(colName) } else { Column(addDataFrameIdToCol(resolve(colName))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a52de12e70c4..267581659d87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -78,6 +78,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) private[sql] def sessionState: SessionState = sparkSession.sessionState private[sql] def sharedState: SharedState = sparkSession.sharedState + @deprecated("Use SparkSession.sessionState.conf instead", "4.0.0") private[sql] def conf: SQLConf = sessionState.conf def sparkContext: SparkContext = sparkSession.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index a94140dae5c0..a8ec810fab3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -194,7 +194,7 @@ case class CreateDataSourceTableAsSelectCommand( result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && - sparkSession.sqlContext.conf.manageFilesourcePartitions => + sparkSession.sessionState.conf.manageFilesourcePartitions => // Need to recover partitions into the metastore so our saved data is visible. sessionState.executePlan(RepairTableCommand( table.identifier, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 130872b10bcd..7e001803592f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -715,7 +715,7 @@ case class RepairTableCommand( val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") - val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { + val partitionStats = if (spark.sessionState.conf.gatherFastStats) { gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) } else { Map.empty[Path, PartitionStatistics] @@ -957,7 +957,7 @@ object DDLUtils extends Logging { def verifyPartitionProviderIsHive( spark: SparkSession, table: CatalogTable, action: String): Unit = { val tableName = table.identifier.table - if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) { + if (!spark.sessionState.conf.manageFilesourcePartitions && isDatasourceTable(table)) { throw QueryCompilationErrors .actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, tableName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 835308f3d024..cebc74af724d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -390,7 +390,7 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => - val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && + val useCatalogFileIndex = sparkSession.sessionState.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && catalogTable.get.partitionColumnNames.nonEmpty val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index e1fdb9570732..80002ecdaf8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -41,12 +41,12 @@ object FileStatusCache { * shared across all clients. */ def getOrCreate(session: SparkSession): FileStatusCache = synchronized { - if (session.sqlContext.conf.manageFilesourcePartitions && - session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { + if (session.sessionState.conf.manageFilesourcePartitions && + session.sessionState.conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( - session.sqlContext.conf.filesourcePartitionFileCacheSize, - session.sqlContext.conf.metadataCacheTTL + session.sessionState.conf.filesourcePartitionFileCacheSize, + session.sessionState.conf.metadataCacheTTL ) } sharedCache.createForNewClient() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index fd1824055dcf..a87453d3fd53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -65,7 +65,7 @@ case class HadoopFsRelation( } override def sizeInBytes: Long = { - val compressionFactor = sqlContext.conf.fileCompressionFactor + val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor (location.sizeInBytes * compressionFactor).toLong } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 37de04a59e4b..dc41afe226b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -157,8 +157,8 @@ abstract class PartitioningAwareFileIndex( typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths, userSpecifiedSchema = userSpecifiedSchema, - caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, - validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, + caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis, + validatePartitionColumns = sparkSession.sessionState.conf.validatePartitionColumns, timeZoneId = timeZoneId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 2760c7ac3019..d9be1a1e3f67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -32,11 +32,12 @@ class JdbcRelationProvider extends CreatableRelationProvider sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) - val resolver = sqlContext.conf.resolver - val timeZoneId = sqlContext.conf.sessionLocalTimeZone + val sparkSession = sqlContext.sparkSession + val resolver = sparkSession.sessionState.conf.resolver + val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone val schema = JDBCRelation.getSchema(resolver, jdbcOptions) val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions) - JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession) + JDBCRelation(schema, parts, jdbcOptions)(sparkSession) } override def createRelation( @@ -45,7 +46,7 @@ class JdbcRelationProvider extends CreatableRelationProvider parameters: Map[String, String], df: DataFrame): BaseRelation = { val options = new JdbcOptionsInWrite(parameters) - val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis + val isCaseSensitive = sqlContext.sparkSession.sessionState.conf.caseSensitiveAnalysis val dialect = JdbcDialects.get(options.url) val conn = dialect.createConnectionFactory(options)(-1) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 3febce0fa445..1bd59e818be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -66,7 +66,7 @@ class MicroBatchExecution( // When the flag is disabled, Spark will fall back to single batch execution, whenever // it figures out any source does not support Trigger.AvailableNow. // See SPARK-45178 for more details. - if (sparkSession.sqlContext.conf.getConf( + if (sparkSession.sessionState.conf.getConf( SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { logInfo("Configured to use the wrapper of Trigger.AvailableNow for query " + s"$prettyIdString.") @@ -113,7 +113,7 @@ class MicroBatchExecution( // transformation is responsible for replacing attributes with their final values. val disabledSources = - Utils.stringToSeq(sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders) + Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { @@ -144,7 +144,7 @@ class MicroBatchExecution( }) } else if (v1.isEmpty) { throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError( - srcName, sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders, table) + srcName, sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders, table) } else { v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index e70e94001eee..ffdf9da6e581 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -145,7 +145,7 @@ trait ProgressReporter extends Logging { private def addNewProgress(newProgress: StreamingQueryProgress): Unit = { progressBuffer.synchronized { progressBuffer += newProgress - while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { + while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) { progressBuffer.dequeue() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index dbddab2e9acd..c1027db6ec77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -124,7 +124,7 @@ private[continuous] class EpochCoordinator( extends ThreadSafeRpcEndpoint with Logging { private val epochBacklogQueueSize = - session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize + session.sessionState.conf.continuousStreamingEpochBacklogQueueSize private var queryWritesStopped: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index d194ae77e968..2911dfae4622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -206,7 +206,7 @@ abstract class BaseRelation { * * @since 1.3.0 */ - def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes + def sizeInBytes: Long = sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes /** * Whether does it need to convert the objects in Row to internal representation, for example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 36dd168992a1..905c96ff4cbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -156,7 +156,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo extraOptions + ("path" -> path.get) } - val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf). + val ds = DataSource.lookupDataSource(source, sparkSession.sessionState.conf). getConstructor().newInstance() // We need to generate the V1 data source so we can pass it to the V2 relation as a shim. // We can't be sure at this point whether we'll actually want to use V2, since we don't know the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 036afa62b488..f4665f8ac677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -369,7 +369,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else { val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = - Utils.stringToSeq(df.sparkSession.sqlContext.conf.disabledV2StreamingWriters) + Utils.stringToSeq(df.sparkSession.sessionState.conf.disabledV2StreamingWriters) val useV1Source = disabledSources.contains(cls.getCanonicalName) || // file source v2 does not support streaming yet. classOf[FileDataSourceV2].isAssignableFrom(cls) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index a0d11e2ce7ae..cd6f41b4ef45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess List.fill(n)(ROW).toDF().repartition(1).write.parquet(dir.getCanonicalPath) val file = TestUtils.listDirectory(dir).head - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file, null) @@ -91,7 +91,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess data.repartition(1).write.parquet(dir.getCanonicalPath) val file = TestUtils.listDirectory(dir).head - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file, null) @@ -125,7 +125,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) val file = TestUtils.listDirectory(dir).head - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file, null /* set columns to null to project all columns */) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index c064f49c3122..1efa8221e41f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1375,7 +1375,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) val file = TestUtils.listDirectory(dir).head; { - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { @@ -1394,7 +1394,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Project just one column { - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { @@ -1412,7 +1412,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Project columns in opposite order { - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { @@ -1431,7 +1431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Empty projection { - val conf = sqlContext.conf + val conf = spark.sessionState.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { @@ -1473,7 +1473,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) - val conf = sqlContext.conf + val conf = spark.sessionState.conf val vectorizedReader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) val partitionValues = new GenericInternalRow(Array(v)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala index 48f90e34890c..01599bb92869 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala @@ -29,7 +29,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest { import testImplicits._ test("RatePerMicroBatchProvider in registry") { - val ds = DataSource.lookupDataSource("rate-micro-batch", spark.sqlContext.conf) + val ds = DataSource.lookupDataSource("rate-micro-batch", spark.sessionState.conf) .getConstructor().newInstance() assert(ds.isInstanceOf[RatePerMicroBatchProvider], "Could not find rate-micro-batch source") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 556782d9c554..051cf9e17b78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -56,7 +56,7 @@ class RateStreamProviderSuite extends StreamTest { } test("RateStreamProvider in registry") { - val ds = DataSource.lookupDataSource("rate", spark.sqlContext.conf) + val ds = DataSource.lookupDataSource("rate", spark.sessionState.conf) .getConstructor().newInstance() assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source") } @@ -64,7 +64,7 @@ class RateStreamProviderSuite extends StreamTest { test("compatible with old path in registry") { val ds = DataSource.lookupDataSource( "org.apache.spark.sql.execution.streaming.RateSourceProvider", - spark.sqlContext.conf).getConstructor().newInstance() + spark.sessionState.conf).getConstructor().newInstance() assert(ds.isInstanceOf[RateStreamProvider], "Could not find rate source") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 92dd3a996801..06cb5be2add6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -87,7 +87,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession { test("backward compatibility with old path") { val ds = DataSource.lookupDataSource( "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider", - spark.sqlContext.conf).getConstructor().newInstance() + spark.sessionState.conf).getConstructor().newInstance() assert(ds.isInstanceOf[TextSocketSourceProvider], "Could not find socket source") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala index b0abcbbe4d02..16f3e972c769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala @@ -317,7 +317,7 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter withTempDir { file => withSQLConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.key -> skipNullsForStreamStreamJoins.toString) { - val storeConf = new StateStoreConf(spark.sqlContext.conf) + val storeConf = new StateStoreConf(spark.sessionState.conf) val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath, UUID.randomUUID, 0, 0, 5) val manager = new SymmetricHashJoinStateManager( LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), storeConf, new Configuration, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index e759ef01e2c7..8655c0a3c29c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1329,7 +1329,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val df = spark.createDataset(Seq("a", "b", "c")).toDF("order") val schema = JdbcUtils.schemaString( df.schema, - df.sqlContext.conf.caseSensitiveAnalysis, + df.sparkSession.sessionState.conf.caseSensitiveAnalysis, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` LONGTEXT")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index ccb202085910..f904d0e3d3c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -417,7 +417,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { assert(JdbcUtils.schemaString( schema, - spark.sqlContext.conf.caseSensitiveAnalysis, + spark.sessionState.conf.caseSensitiveAnalysis, url1, Option(createTableColTypes)) == expectedSchemaStr) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 746f289c3932..898e80df0207 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -449,7 +449,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti joined.sort("bucketed_table1.k", "bucketed_table2.k"), df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k")) - val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { + val joinOperator = if (joined.sparkSession.sessionState.conf.adaptiveExecutionEnabled) { val executedPlan = joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan assert(executedPlan.isInstanceOf[SortMergeJoinExec]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 953bbddf6abb..883f64ff7af4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1317,7 +1317,7 @@ class StreamSuite extends StreamTest { .map(_.asInstanceOf[RepartitionByExpression].numPartitions) // Before the fix of SPARK-34482, the numPartition is the value of // `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`. - assert(numPartition.get === spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)) + assert(numPartition.get === spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)) } finally { if (query != null) { query.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index be84640f4bf3..8d79cf4af771 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -708,7 +708,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { .groupBy("group") .agg(collect_list("value")) testStream(df, outputMode = OutputMode.Update)( - AddData(input, (1 to spark.sqlContext.conf.objectAggSortBasedFallbackThreshold): _*), + AddData(input, (1 to spark.sessionState.conf.objectAggSortBasedFallbackThreshold): _*), AssertOnQuery { q => q.processAllAvailable() true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 4692c685c80b..e05cb4d3c35c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -618,7 +618,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { } } - val numPartitions = spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) + val numPartitions = spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS) assert(query.lastExecution.executedPlan.collect { case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala index 55b884573f64..dac9e760e4be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala @@ -93,8 +93,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar { partitionReader, new StructType().add("i", "int"), mockContext, - dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize, - epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs) + dataQueueSize = spark.sessionState.conf.continuousStreamingExecutorQueueSize, + epochPollIntervalMs = spark.sessionState.conf.continuousStreamingExecutorPollIntervalMs) (queue, reader) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 1a4862bf9781..e77ba92fe298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -422,10 +422,10 @@ class StreamingDataSourceV2Suite extends StreamTest { for ((read, write, trigger) <- cases) { testQuietly(s"stream with read format $read, write format $write, trigger $trigger") { - val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor() + val sourceTable = DataSource.lookupDataSource(read, spark.sessionState.conf).getConstructor() .newInstance().asInstanceOf[SimpleTableProvider].getTable(CaseInsensitiveStringMap.empty()) - val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor() + val sinkTable = DataSource.lookupDataSource(write, spark.sessionState.conf).getConstructor() .newInstance().asInstanceOf[SimpleTableProvider].getTable(CaseInsensitiveStringMap.empty()) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e70d05820c34..f8c592a943a0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -49,11 +49,13 @@ private[hive] class SparkExecuteStatementOperation( with SparkOperation with Logging { + val session = sqlContext.sparkSession + // If a timeout value `queryTimeout` is specified by users and it is smaller than // a global timeout value, we use the user-specified value. // This code follows the Hive timeout behaviour (See #29933 for details). private val timeout = { - val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) + val globalTimeout = session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) { globalTimeout } else { @@ -61,13 +63,13 @@ private[hive] class SparkExecuteStatementOperation( } } - private val forceCancel = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL) + private val forceCancel = session.sessionState.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL) private val redactedStatement = { - val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) { + val substitutorStatement = SQLConf.withExistingConf(session.sessionState.conf) { new VariableSubstitution().substitute(statement) } - SparkUtils.redact(sqlContext.conf.stringRedactionPattern, substitutorStatement) + SparkUtils.redact(session.sessionState.conf.stringRedactionPattern, substitutorStatement) } private var result: DataFrame = _ @@ -259,7 +261,7 @@ private[hive] class SparkExecuteStatementOperation( e match { case _: HiveSQLException => throw e case _ => throw HiveThriftServerErrors.runningQueryError( - e, sqlContext.conf.errorMessageFormat) + e, sqlContext.sparkSession.sessionState.conf.errorMessageFormat) } } } finally { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 5b76cd653e37..73290a4d2592 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -284,7 +284,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { var prefix = "" def currentDB = { - if (!SparkSQLEnv.sqlContext.conf.getConf(LEGACY_EMPTY_CURRENT_DB_IN_CLI)) { + if (!SparkSQLEnv.sqlContext.sparkSession.sessionState.conf + .getConf(LEGACY_EMPTY_CURRENT_DB_IN_CLI)) { s" (${SparkSQLEnv.sqlContext.sparkSession.catalog.currentDatabase})" } else { ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb", @@ -448,7 +449,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } override def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit = { - hiveVariables.asScala.foreach(kv => SparkSQLEnv.sqlContext.conf.setConfString(kv._1, kv._2)) + hiveVariables.asScala.foreach(kv => + SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.setConfString(kv._1, kv._2)) } def printMasterAndAppId(): Unit = { @@ -504,7 +506,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { ret = rc.getResponseCode if (ret != 0) { - val format = SparkSQLEnv.sqlContext.conf.errorMessageFormat + val format = SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.errorMessageFormat val e = rc.getException val msg = e match { case st: SparkThrowable with Throwable => SparkThrowableHelper.getMessage(st, format) @@ -523,7 +525,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { val res = new JArrayList[String]() if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER) || - SparkSQLEnv.sqlContext.conf.cliPrintHeader) { + SparkSQLEnv.sqlContext.sparkSession.sessionState.conf.cliPrintHeader) { // Print the column names. Option(driver.getSchema.getFieldSchemas).foreach { fields => out.println(fields.asScala.map(_.getName).mkString("\t")) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 4834956f478d..5d9ec3051dc3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -61,7 +61,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont override def run(command: String): CommandProcessorResponse = { try { - val substitutorCommand = SQLConf.withExistingConf(context.conf) { + val substitutorCommand = SQLConf.withExistingConf(context.sparkSession.sessionState.conf) { new VariableSubstitution().substitute(command) } context.sparkContext.setJobDescription(substitutorCommand) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index b6528ac62419..7acc485b01e5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -58,7 +58,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: val session = super.getSession(sessionHandle) HiveThriftServer2.eventManager.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) - val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) { + val ctx = if (sqlContext.sparkSession.sessionState.conf.hiveThriftServerSingleSession) { sqlContext } else { sqlContext.newSession() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 08f69aecdd2e..e7d03b82274c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -194,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val tableIdentifier = QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) - val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions + val lazyPruningEnabled = sparkSession.sessionState.conf.manageFilesourcePartitions val tablePath = new Path(relation.tableMeta.location) val fileFormat = fileFormatClass.getConstructor().newInstance() val bucketSpec = relation.tableMeta.bucketSpec diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index a9861dafda72..d4847ee830f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -70,7 +70,9 @@ object TestHive // LocalRelation will exercise the optimization rules better by disabling it as // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. - .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName))) { + override def conf: SQLConf = sparkSession.sessionState.conf +} case class TestHiveVersion(hiveClient: HiveClient) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org