This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 42b80ae  [SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in 
SQL
42b80ae is described below

commit 42b80ae128ab1aa8a87c1376fe88e2cde52e6e4f
Author: wangguangxin.cn <[email protected]>
AuthorDate: Thu Jul 11 22:36:07 2019 -0700

    [SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
    
    ## What changes were proposed in this pull request?
    
    There are some hardcoded configs, using config entry to replace them.
    
    ## How was this patch tested?
    
    Existing UT
    
    Closes #25059 from WangGuangxin/ConfigEntry.
    
    Authored-by: wangguangxin.cn <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      |  2 +-
 .../kafka010/KafkaDontFailOnDataLossSuite.scala    |  2 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 18 +++++++++++++
 .../sql/catalyst/analysis/TypeCoercionSuite.scala  | 10 ++++----
 .../execution/aggregate/HashAggregateExec.scala    | 14 ++++------
 .../execution/columnar/InMemoryTableScanExec.scala |  3 +--
 .../apache/spark/sql/AggregateHashMapSuite.scala   | 29 +++++++++++----------
 .../org/apache/spark/sql/DataFrameSuite.scala      |  2 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |  4 +--
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  8 +++---
 .../org/apache/spark/sql/RuntimeConfigSuite.scala  |  6 +++--
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  8 +++---
 .../spark/sql/SparkSessionExtensionSuite.scala     |  9 ++++---
 .../apache/spark/sql/execution/SQLViewSuite.scala  |  7 ++---
 .../sql/execution/SQLWindowFunctionSuite.scala     |  5 ++--
 .../sql/execution/arrow/ArrowConvertersSuite.scala |  4 +--
 .../execution/benchmark/AggregateBenchmark.scala   | 20 +++++++--------
 .../BuiltInDataSourceWriteBenchmark.scala          |  6 +++--
 .../columnar/InMemoryColumnarQuerySuite.scala      |  2 +-
 .../columnar/PartitionBatchPruningSuite.scala      |  2 +-
 .../execution/command/PlanResolutionSuite.scala    |  3 ++-
 .../datasources/FileSourceStrategySuite.scala      |  4 +--
 .../sql/execution/datasources/csv/CSVSuite.scala   |  4 +--
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +--
 .../datasources/parquet/ParquetIOSuite.scala       |  4 +--
 .../parquet/ParquetPartitionDiscoverySuite.scala   |  2 +-
 .../sql/execution/metric/SQLMetricsSuite.scala     |  6 ++---
 .../sql/execution/metric/SQLMetricsTestUtils.scala |  3 ++-
 .../state/StateStoreCoordinatorSuite.scala         |  3 ++-
 .../streaming/state/StateStoreSuite.scala          |  2 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  2 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      |  2 +-
 .../spark/sql/sources/BucketedWriteSuite.scala     |  2 +-
 .../sql/sources/CreateTableAsSelectSuite.scala     |  3 ++-
 .../sql/sources/v2/DataSourceV2SQLSuite.scala      |  5 ++--
 .../sql/streaming/EventTimeWatermarkSuite.scala    |  2 +-
 .../sql/streaming/FileStreamSourceSuite.scala      |  2 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  6 ++---
 .../StreamingQueryListenersConfSuite.scala         |  3 ++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  8 +++---
 .../continuous/ContinuousAggregationSuite.scala    |  5 ++--
 .../sql/streaming/continuous/ContinuousSuite.scala |  4 +--
 .../sources/StreamingDataSourceV2Suite.scala       |  2 +-
 .../test/DataStreamReaderWriterSuite.scala         |  2 +-
 .../sql/test/DataFrameReaderWriterSuite.scala      |  2 +-
 .../thriftserver/HiveThriftServer2Suites.scala     | 11 ++++----
 .../hive/HiveExternalCatalogVersionsSuite.scala    | 23 +++++++++--------
 .../spark/sql/hive/HiveSparkSubmitSuite.scala      | 30 ++++++++++++----------
 .../sql/hive/execution/AggregationQuerySuite.scala |  2 +-
 .../org/apache/spark/sql/hive/test/TestHive.scala  |  8 +++---
 51 files changed, 178 insertions(+), 144 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 40bf3b1..924bf37 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -50,7 +50,7 @@ abstract class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUti
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set("spark.sql.files.maxPartitionBytes", 1024)
+    spark.conf.set(SQLConf.FILES_MAX_PARTITION_BYTES.key, 1024)
   }
 
   def checkReloadMatchesSaved(originalFile: String, newFile: String): Unit = {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index e089e36..ba8340e 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -135,7 +135,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with 
KafkaMissingOffsetsTe
 
   test("failOnDataLoss=false should not return duplicated records: microbatch 
v1") {
     withSQLConf(
-      "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+      SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key ->
         classOf[KafkaSourceProvider].getCanonicalName) {
       verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = 
true) { (df, table) =>
         val query = df.writeStream.format("memory").queryName(table).start()
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 3d14ebe..bb9b369 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1066,7 +1066,7 @@ class KafkaMicroBatchV1SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
   override def beforeAll(): Unit = {
     super.beforeAll()
     spark.conf.set(
-      "spark.sql.streaming.disabledV2MicroBatchReaders",
+      SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key,
       classOf[KafkaSourceProvider].getCanonicalName)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index feb3b46..94d197b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -234,6 +234,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED =
+    buildConf("spark.sql.inMemoryTableScanStatistics.enable")
+      .internal()
+      .doc("When true, enable in-memory table scan accumulators.")
+      .booleanConf
+      .createWithDefault(false)
+
   val CACHE_VECTORIZED_READER_ENABLED =
     buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
       .doc("Enables vectorized reader for columnar caching.")
@@ -1024,6 +1031,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val ENABLE_VECTORIZED_HASH_MAP =
+    buildConf("spark.sql.codegen.aggregate.map.vectorized.enable")
+      .internal()
+      .doc("Enable vectorized aggregate hash map. This is for 
testing/benchmarking only.")
+      .booleanConf
+      .createWithDefault(false)
+
   val MAX_NESTED_VIEW_DEPTH =
     buildConf("spark.sql.view.maxNestedViewDepth")
       .internal()
@@ -2109,6 +2123,8 @@ class SQLConf extends Serializable with Logging {
 
   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
 
+  def inMemoryTableScanStatisticsEnabled: Boolean = 
getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED)
+
   def offHeapColumnVectorEnabled: Boolean = 
getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
 
   def columnNameOfCorruptRecord: String = 
getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -2148,6 +2164,8 @@ class SQLConf extends Serializable with Logging {
 
   def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
 
+  def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP)
+
   def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)
 
   def objectAggSortBasedFallbackThreshold: Int = 
getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 2c3ba1b..a725e4b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1126,14 +1126,14 @@ class TypeCoercionSuite extends AnalysisTest {
       Concat(Seq(Cast(Literal(new java.sql.Date(0)), StringType),
         Cast(Literal(new Timestamp(0)), StringType))))
 
-    withSQLConf("spark.sql.function.concatBinaryAsString" -> "true") {
+    withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "true") {
       ruleTest(rule,
         Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
         Concat(Seq(Cast(Literal("123".getBytes), StringType),
           Cast(Literal("456".getBytes), StringType))))
     }
 
-    withSQLConf("spark.sql.function.concatBinaryAsString" -> "false") {
+    withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "false") {
       ruleTest(rule,
         Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
         Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))))
@@ -1180,14 +1180,14 @@ class TypeCoercionSuite extends AnalysisTest {
       Elt(Seq(Literal(2), Cast(Literal(new java.sql.Date(0)), StringType),
         Cast(Literal(new Timestamp(0)), StringType))))
 
-    withSQLConf("spark.sql.function.eltOutputAsString" -> "true") {
+    withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "true") {
       ruleTest(rule,
         Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
         Elt(Seq(Literal(1), Cast(Literal("123".getBytes), StringType),
           Cast(Literal("456".getBytes), StringType))))
     }
 
-    withSQLConf("spark.sql.function.eltOutputAsString" -> "false") {
+    withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "false") {
       ruleTest(rule,
         Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
         Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))))
@@ -1498,7 +1498,7 @@ class TypeCoercionSuite extends AnalysisTest {
         DoubleType)))
     Seq(true, false).foreach { convertToTS =>
       withSQLConf(
-        "spark.sql.legacy.compareDateTimestampInTimestamp" -> 
convertToTS.toString) {
+        SQLConf.COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP.key -> 
convertToTS.toString) {
         val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
         val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 
00:00:00"))
         val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 
00:00:01"))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 25ff658..4a95f76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 import org.apache.spark.unsafe.KVIterator
 import org.apache.spark.util.Utils
@@ -559,7 +560,7 @@ case class HashAggregateExec(
   private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = {
     if (!checkIfFastHashMapSupported(ctx)) {
       if (modes.forall(mode => mode == Partial || mode == PartialMerge) && 
!Utils.isTesting) {
-        logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to 
true, but"
+        logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but"
           + " current version of codegened fast hashmap does not support this 
aggregate.")
       }
     } else {
@@ -567,8 +568,7 @@ case class HashAggregateExec(
 
       // This is for testing/benchmarking only.
       // We enforce to first level to be a vectorized hashmap, instead of the 
default row-based one.
-      isVectorizedHashMapEnabled = sqlContext.getConf(
-        "spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true"
+      isVectorizedHashMapEnabled = sqlContext.conf.enableVectorizedHashMap
     }
   }
 
@@ -576,12 +576,8 @@ case class HashAggregateExec(
     val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
     if (sqlContext.conf.enableTwoLevelAggMap) {
       enableTwoLevelHashMap(ctx)
-    } else {
-      sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable", 
null) match {
-        case "true" =>
-          logWarning("Two level hashmap is disabled but vectorized hashmap is 
enabled.")
-        case _ =>
-      }
+    } else if (sqlContext.conf.enableVectorizedHashMap) {
+      logWarning("Two level hashmap is disabled but vectorized hashmap is 
enabled.")
     }
     val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 7a8c6d6..3566ab1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -297,8 +297,7 @@ case class InMemoryTableScanExec(
     }
   }
 
-  lazy val enableAccumulatorsForTest: Boolean =
-    sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", 
"false").toBoolean
+  lazy val enableAccumulatorsForTest: Boolean = 
sqlContext.conf.inMemoryTableScanStatisticsEnabled
 
   // Accumulators used for testing purposes
   lazy val readPartitions = sparkContext.longAccumulator
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
index 938d76c..b253c4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
@@ -20,33 +20,34 @@ package org.apache.spark.sql
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.SQLConf
 
 class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with 
BeforeAndAfter {
   override protected def sparkConf: SparkConf = super.sparkConf
-    .set("spark.sql.codegen.fallback", "false")
-    .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
+    .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+    .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "false")
 
   // adding some checking after each test is run, assuring that the configs 
are not changed
   // in test code
   after {
-    assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+    assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
       "configuration parameter changed in test body")
-    assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") 
== "false",
+    assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "false",
       "configuration parameter changed in test body")
   }
 }
 
 class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with 
BeforeAndAfter {
   override protected def sparkConf: SparkConf = super.sparkConf
-    .set("spark.sql.codegen.fallback", "false")
-    .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
+    .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+    .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")
 
   // adding some checking after each test is run, assuring that the configs 
are not changed
   // in test code
   after {
-    assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+    assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
       "configuration parameter changed in test body")
-    assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") 
== "true",
+    assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
       "configuration parameter changed in test body")
   }
 }
@@ -56,18 +57,18 @@ class TwoLevelAggregateHashMapWithVectorizedMapSuite
   with BeforeAndAfter {
 
   override protected def sparkConf: SparkConf = super.sparkConf
-    .set("spark.sql.codegen.fallback", "false")
-    .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
-    .set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
+    .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+    .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")
+    .set(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key, "true")
 
   // adding some checking after each test is run, assuring that the configs 
are not changed
   // in test code
   after {
-    assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+    assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
       "configuration parameter changed in test body")
-    assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") 
== "true",
+    assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
       "configuration parameter changed in test body")
-    assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") 
== "true",
+    assert(sparkConf.get(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key) == "true",
       "configuration parameter changed in test body")
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 9893670..e8ddd4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1672,7 +1672,7 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("reuse exchange") {
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") {
       val df = spark.range(100).toDF()
       val join = df.join(df, "id")
       val plan = join.queryExecution.executedPlan
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index fffe52d..8919528 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -671,8 +671,8 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
 
   test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes 
effect") {
     Seq(1.0, 0.5).foreach { compressionFactor =>
-      withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
-        "spark.sql.autoBroadcastJoinThreshold" -> "250") {
+      withSQLConf(SQLConf.FILE_COMRESSION_FACTOR.key -> 
compressionFactor.toString,
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "250") {
         withTempPath { workDir =>
           // the file size is 486 bytes
           val workDirPath = workDir.getAbsolutePath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 32cddc9..531cc86 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -72,7 +72,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   test("join operator selection") {
     spark.sharedState.cacheManager.clearCache()
 
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
       SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
       Seq(
         ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
@@ -651,7 +651,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
   test("test SortMergeJoin (without spill)") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
-      "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> 
Int.MaxValue.toString) {
+      SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> 
Int.MaxValue.toString) {
 
       assertNotSpilled(sparkContext, "inner join") {
         checkAnswer(
@@ -708,8 +708,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
   test("test SortMergeJoin (with spill)") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
-      "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
-      "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {
+      SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
+      SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
 
       assertSpilled(sparkContext, "inner join") {
         checkAnswer(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
index 3284231..720d570 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.config
+import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
+import 
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
 
 class RuntimeConfigSuite extends SparkFunSuite {
 
@@ -60,8 +62,8 @@ class RuntimeConfigSuite extends SparkFunSuite {
     val conf = newConf()
 
     // SQL configs
-    assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold"))
-    assert(conf.isModifiable("spark.sql.streaming.checkpointLocation"))
+    assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
+    assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
     // Core configs
     assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
     assert(!conf.isModifiable("spark.executor.cores"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2cc1be9..9729506 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1896,7 +1896,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("Star Expansion - group by") {
-    withSQLConf("spark.sql.retainGroupColumns" -> "false") {
+    withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") {
       checkAnswer(
         testData2.groupBy($"a", $"b").agg($"*"),
         sql("SELECT * FROM testData2 group by a, b"))
@@ -1936,7 +1936,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 
   test("Common subexpression elimination") {
     // TODO: support subexpression elimination in whole stage codegen
-    withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
+    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
       // select from a table to prevent constant folding.
       val df = sql("SELECT a, b from testData2 limit 1")
       checkAnswer(df, Row(1, 1))
@@ -1985,9 +1985,9 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
         df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), 
Row(4, 2), 1)
 
       // Try disabling it via configuration.
-      spark.conf.set("spark.sql.subexpressionElimination.enabled", "false")
+      spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false")
       verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2)
-      spark.conf.set("spark.sql.subexpressionElimination.enabled", "true")
+      spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "true")
       verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1)
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 2e2e61b..74341f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
 import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, 
Metadata, StructType}
 import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch, 
ColumnarMap, ColumnVector}
 import org.apache.spark.unsafe.types.UTF8String
@@ -152,7 +153,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
   test("use custom class for extensions") {
     val session = SparkSession.builder()
       .master("local[1]")
-      .config("spark.sql.extensions", classOf[MyExtensions].getCanonicalName)
+      .config(SPARK_SESSION_EXTENSIONS.key, 
classOf[MyExtensions].getCanonicalName)
       .getOrCreate()
     try {
       
assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session)))
@@ -173,7 +174,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
   test("use multiple custom class for extensions in the specified order") {
     val session = SparkSession.builder()
       .master("local[1]")
-      .config("spark.sql.extensions", Seq(
+      .config(SPARK_SESSION_EXTENSIONS.key, Seq(
         classOf[MyExtensions2].getCanonicalName,
         classOf[MyExtensions].getCanonicalName).mkString(","))
       .getOrCreate()
@@ -201,7 +202,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
   test("allow an extension to be duplicated") {
     val session = SparkSession.builder()
       .master("local[1]")
-      .config("spark.sql.extensions", Seq(
+      .config(SPARK_SESSION_EXTENSIONS.key, Seq(
         classOf[MyExtensions].getCanonicalName,
         classOf[MyExtensions].getCanonicalName).mkString(","))
       .getOrCreate()
@@ -228,7 +229,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
   test("use the last registered function name when there are duplicates") {
     val session = SparkSession.builder()
       .master("local[1]")
-      .config("spark.sql.extensions", Seq(
+      .config(SPARK_SESSION_EXTENSIONS.key, Seq(
         classOf[MyExtensions2].getCanonicalName,
         classOf[MyExtensions2Duplicate].getCanonicalName).mkString(","))
       .getOrCreate()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 8269d4d..64e305c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.internal.SQLConf.MAX_NESTED_VIEW_DEPTH
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 
 class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext
@@ -665,17 +666,17 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
         sql(s"CREATE VIEW view${idx + 1} AS SELECT * FROM view$idx")
       }
 
-      withSQLConf("spark.sql.view.maxNestedViewDepth" -> "10") {
+      withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "10") {
         val e = intercept[AnalysisException] {
           sql("SELECT * FROM view10")
         }.getMessage
         assert(e.contains("The depth of view `default`.`view0` exceeds the 
maximum view " +
           "resolution depth (10). Analysis is aborted to avoid errors. 
Increase the value " +
-          "of spark.sql.view.maxNestedViewDepth to work around this."))
+          s"of ${MAX_NESTED_VIEW_DEPTH.key} to work around this."))
       }
 
       val e = intercept[IllegalArgumentException] {
-        withSQLConf("spark.sql.view.maxNestedViewDepth" -> "0") {}
+        withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "0") {}
       }.getMessage
       assert(e.contains("The maximum depth of a view reference in a nested 
view must be " +
         "positive."))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index 1c6fc35..971fd84 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.TestUtils.assertSpilled
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
 import org.apache.spark.sql.test.SharedSQLContext
 
 case class WindowData(month: Int, area: String, product: Int)
@@ -477,8 +478,8 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSQLContext {
         |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT 
RoW)
       """.stripMargin)
 
-    withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1",
-      "spark.sql.windowExec.buffer.spill.threshold" -> "2") {
+    withSQLConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
+      WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") {
       assertSpilled(sparkContext, "test with low buffer spill threshold") {
         checkAnswer(actual, expected)
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 86874b9..67c3fa0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1191,7 +1191,7 @@ class ArrowConvertersSuite extends SharedSQLContext with 
BeforeAndAfterAll {
   test("max records in batch conf") {
     val totalRecords = 10
     val maxRecordsPerBatch = 3
-    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 
maxRecordsPerBatch)
+    spark.conf.set(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key, 
maxRecordsPerBatch)
     val df = spark.sparkContext.parallelize(1 to totalRecords, 2).toDF("i")
     val arrowBatches = df.toArrowBatchRdd.collect()
     assert(arrowBatches.length >= 4)
@@ -1206,7 +1206,7 @@ class ArrowConvertersSuite extends SharedSQLContext with 
BeforeAndAfterAll {
     }
     assert(recordCount == totalRecords)
     allocator.close()
-    spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch")
+    spark.conf.unset(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key)
   }
 
   testQuietly("unsupported types") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index 81158d9..2776bc3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -83,7 +83,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
           f()
         }
       }
@@ -92,7 +92,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
           f()
         }
       }
@@ -119,7 +119,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
           f()
         }
       }
@@ -128,7 +128,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
           f()
         }
       }
@@ -154,7 +154,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
           f()
         }
       }
@@ -163,7 +163,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
           f()
         }
       }
@@ -189,7 +189,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
           f()
         }
       }
@@ -198,7 +198,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
           f()
         }
       }
@@ -234,7 +234,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
           f()
         }
       }
@@ -243,7 +243,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
         withSQLConf(
           SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
           SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
-          "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+          SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
           f()
         }
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index cd97324..6925bdd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.benchmark
 
+import org.apache.spark.sql.internal.SQLConf
+
 /**
  * Benchmark to measure built-in data sources write performance.
  * To run this benchmark:
@@ -45,8 +47,8 @@ object BuiltInDataSourceWriteBenchmark extends 
DataSourceWriteBenchmark {
       mainArgs
     }
 
-    spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
-    spark.conf.set("spark.sql.orc.compression.codec", "snappy")
+    spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
+    spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
 
     formats.foreach { format =>
       runBenchmark(s"$format writer benchmark") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 466baf2..711ecf1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -437,7 +437,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-20356: pruned InMemoryTableScanExec should have correct ordering 
and partitioning") {
-    withSQLConf("spark.sql.shuffle.partitions" -> "200") {
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") {
       val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group")
       val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id")
       val df3 = df1.join(df2, Seq("item")).select($"id", 
$"group".as("item")).distinct()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index b3a5c68..e740992 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -50,7 +50,7 @@ class PartitionBatchPruningSuite
     // Enable in-memory partition pruning
     spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, true)
     // Enable in-memory table scan accumulators
-    spark.conf.set("spark.sql.inMemoryTableScanStatistics.enable", "true")
+    spark.conf.set(SQLConf.IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED.key, "true")
   }
 
   override protected def afterAll(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 7df0dab..ce20966 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
CreateV2Table, DropTable, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceResolution}
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
+import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -77,7 +78,7 @@ class PlanResolutionSuite extends AnalysisTest {
 
   def parseAndResolve(query: String, withDefault: Boolean = false): 
LogicalPlan = {
     val newConf = conf.copy()
-    newConf.setConfString("spark.sql.default.catalog", "testcat")
+    newConf.setConfString(DEFAULT_V2_CATALOG.key, "testcat")
     DataSourceResolution(newConf, if (withDefault) lookupWithDefault else 
lookupWithoutDefault)
         .apply(parsePlan(query))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index af524c7..eaff5a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -201,7 +201,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
   }
 
   test("partitioned table - case insensitive") {
-    withSQLConf("spark.sql.caseSensitive" -> "false") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
       val table =
         createTable(
           files = Seq(
@@ -437,7 +437,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
   }
 
   test("[SPARK-16818] exchange reuse respects differences in partition 
pruning") {
-    spark.conf.set("spark.sql.exchange.reuse", true)
+    spark.conf.set(SQLConf.EXCHANGE_REUSE_ENABLED.key, true)
     withTempPath { path =>
       val tempDir = path.getCanonicalPath
       spark.range(10)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2e7d682..fdb50a6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1399,8 +1399,8 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     // that whole test file is mapped to only one partition. This will 
guarantee
     // reliable sampling of the input file.
     withSQLConf(
-      "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
-      "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+      SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString,
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString
     )(withTempPath { path =>
       val ds = sampledTestData.coalesce(1)
       ds.write.text(path.getAbsolutePath)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6316e89..34b44be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2041,8 +2041,8 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     // that whole test file is mapped to only one partition. This will 
guarantee
     // reliable sampling of the input file.
     withSQLConf(
-      "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
-      "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+      SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString,
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString
     )(withTempPath { path =>
       val ds = sampledTestData.coalesce(1)
       ds.write.text(path.getAbsolutePath)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6b05b9c..6f2218b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -475,7 +475,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
         classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
       val extraOptions = Map(
         SQLConf.OUTPUT_COMMITTER_CLASS.key -> 
classOf[ParquetOutputCommitter].getCanonicalName,
-        "spark.sql.parquet.output.committer.class" ->
+        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
           classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
       )
       withTempPath { dir =>
@@ -505,7 +505,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       // Using a output committer that always fail when committing a task, so 
that both
       // `commitTask()` and `abortTask()` are invoked.
       val extraOptions = Map[String, String](
-        "spark.sql.parquet.output.committer.class" ->
+        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
           classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
       )
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 6f3ed3d..04ace0a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -953,7 +953,7 @@ abstract class ParquetPartitionDiscoverySuite
 
       withSQLConf(
           ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
-          "spark.sql.sources.commitProtocolClass" ->
+          SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
             classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
         spark.range(3).write.parquet(s"$path/p0=0/p1=0")
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index b260f5d..dc4a299 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -277,9 +277,9 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   }
 
   test("ShuffledHashJoin metrics") {
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
-        "spark.sql.shuffle.partitions" -> "2",
-        "spark.sql.join.preferSortMergeJoin" -> "false") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+        SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
       val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
       val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
       // Assume the execution plan is
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index f12eeaa..8f26c04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
+import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
 import org.apache.spark.sql.test.SQLTestUtils
 
 
@@ -154,7 +155,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
        expectedNodeIds: Set[Long],
        enableWholeStage: Boolean = false): Option[Map[Long, (String, 
Map[String, Any])]] = {
     val previousExecutionIds = currentExecutionIds()
-    withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) {
+    withSQLConf(WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage.toString) {
       df.collect()
     }
     sparkContext.listenerBus.waitUntilEmpty(10000)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 2a1e7d6..7bca225 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
 import org.apache.spark.sql.functions.count
+import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
 import org.apache.spark.util.Utils
 
 class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext 
{
@@ -124,7 +125,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       import spark.implicits._
       coordRef = spark.streams.stateStoreCoordinator
       implicit val sqlContext = spark.sqlContext
-      spark.conf.set("spark.sql.shuffle.partitions", "1")
+      spark.conf.set(SHUFFLE_PARTITIONS.key, "1")
 
       // Start a query and run a batch to load state stores
       val inputData = MemoryStream[Int]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index af4369d..a84d107 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -569,7 +569,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       val spark = SparkSession.builder().master("local[2]").getOrCreate()
       SparkSession.setActiveSession(spark)
       implicit val sqlContext = spark.sqlContext
-      spark.conf.set("spark.sql.shuffle.partitions", "1")
+      spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1")
       import spark.implicits._
       val inputData = MemoryStream[Int]
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index e3e5ddf..8edbb87 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -647,7 +647,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends 
SparkFunSuite {
       .setMaster("local")
       .setAppName("test")
       .set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this 
test quickly
-      .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
+      .set(UI_RETAINED_EXECUTIONS.key, "50") // Set it to 50 to run this test 
quickly
       .set(ASYNC_TRACKING_ENABLED, false)
     withSpark(new SparkContext(conf)) { sc =>
       quietly {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5f27e75..a28f4e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1322,7 +1322,7 @@ class JDBCSuite extends QueryTest
 
     testJdbcParitionColumn("THEID", "THEID")
     testJdbcParitionColumn("\"THEID\"", "THEID")
-    withSQLConf("spark.sql.caseSensitive" -> "false") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
       testJdbcParitionColumn("ThEiD", "THEID")
     }
     testJdbcParitionColumn("THE ID", "THE ID")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index fc61050..75f68de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -63,7 +63,7 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
     val maxNrBuckets: Int = 200000
     val catalog = spark.sessionState.catalog
 
-    withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> 
maxNrBuckets.toString) {
+    withSQLConf(SQLConf.BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) {
       // within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
         withTable("t") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index d46029e..5f98566 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.internal.SQLConf.BUCKETING_MAX_BUCKETS
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
@@ -252,7 +253,7 @@ class CreateTableAsSelectSuite
 
     val maxNrBuckets: Int = 200000
     val catalog = spark.sessionState.catalog
-    withSQLConf("spark.sql.sources.bucketing.maxBuckets" -> 
maxNrBuckets.toString) {
+    withSQLConf(BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) {
 
       // Within the new limit
       Seq(100001, maxNrBuckets).foreach(numBuckets => {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index 70d0624..c90090a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalog.v2.Identifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
+import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, 
LongType, MapType, StringType, StructField, StructType, TimestampType}
 
@@ -39,7 +40,7 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
   before {
     spark.conf.set("spark.sql.catalog.testcat", 
classOf[TestInMemoryTableCatalog].getName)
     spark.conf.set("spark.sql.catalog.testcat2", 
classOf[TestInMemoryTableCatalog].getName)
-    spark.conf.set("spark.sql.catalog.session", 
classOf[TestInMemoryTableCatalog].getName)
+    spark.conf.set(V2_SESSION_CATALOG.key, 
classOf[TestInMemoryTableCatalog].getName)
 
     val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
     df.createOrReplaceTempView("source")
@@ -281,7 +282,7 @@ class DataSourceV2SQLSuite extends QueryTest with 
SharedSQLContext with BeforeAn
 
   test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
     val sparkSession = spark.newSession()
-    sparkSession.conf.set("spark.sql.catalog.session", 
classOf[V2SessionCatalog].getName)
+    sparkSession.conf.set(V2_SESSION_CATALOG.key, 
classOf[V2SessionCatalog].getName)
 
     val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
     df.createOrReplaceTempView("source")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4bf49ff..92ec2a0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -305,7 +305,7 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 
   test("update mode") {
     val inputData = MemoryStream[Int]
-    spark.conf.set("spark.sql.shuffle.partitions", "10")
+    spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "10")
 
     val windowedAggregation = inputData.toDF()
       .withColumn("eventTime", $"value".cast("timestamp"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2b8d773..72f8938 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1310,7 +1310,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
               val start = startId.map(new FileStreamSourceOffset(_))
               val end = FileStreamSourceOffset(endId)
 
-              withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
+              withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> 
"false") {
                 assert(fileSource.getBatch(start, 
end).as[String].collect().toSeq === expected)
               }
             }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f2f5fad..1ed2599 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -871,7 +871,7 @@ class StreamSuite extends StreamTest {
 
   testQuietly("specify custom state store provider") {
     val providerClassName = classOf[TestStateStoreProvider].getCanonicalName
-    withSQLConf("spark.sql.streaming.stateStore.providerClass" -> 
providerClassName) {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName) {
       val input = MemoryStream[Int]
       val df = input.toDS().groupBy().count()
       val query = 
df.writeStream.outputMode("complete").format("memory").queryName("name").start()
@@ -888,9 +888,9 @@ class StreamSuite extends StreamTest {
   testQuietly("custom state store provider read from offset log") {
     val input = MemoryStream[Int]
     val df = input.toDS().groupBy().count()
-    val providerConf1 = "spark.sql.streaming.stateStore.providerClass" ->
+    val providerConf1 = SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
       
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"
-    val providerConf2 = "spark.sql.streaming.stateStore.providerClass" ->
+    val providerConf2 = SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
       classOf[TestStateStoreProvider].getCanonicalName
 
     def runQuery(queryName: String, checkpointLoc: String): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
index 88f510c..da2f221 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 
 
@@ -29,7 +30,7 @@ class StreamingQueryListenersConfSuite extends StreamTest 
with BeforeAndAfter {
   import testImplicits._
 
   override protected def sparkConf: SparkConf =
-    super.sparkConf.set("spark.sql.streaming.streamingQueryListeners",
+    super.sparkConf.set(STREAMING_QUERY_LISTENERS.key,
       "org.apache.spark.sql.streaming.TestListener")
 
   test("test if the configured query lister is loaded") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index a5cb25c..e6b56e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -413,9 +413,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       sources.nonEmpty
     }
     // Disabled by default
-    assert(spark.conf.get("spark.sql.streaming.metricsEnabled").toBoolean === 
false)
+    assert(spark.conf.get(SQLConf.STREAMING_METRICS_ENABLED.key).toBoolean === 
false)
 
-    withSQLConf("spark.sql.streaming.metricsEnabled" -> "false") {
+    withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "false") {
       testStream(inputData.toDF)(
         AssertOnQuery { q => !isMetricsRegistered(q) },
         StopStream,
@@ -424,7 +424,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
 
     // Registered when enabled
-    withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+    withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") {
       testStream(inputData.toDF)(
         AssertOnQuery { q => isMetricsRegistered(q) },
         StopStream,
@@ -434,7 +434,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   }
 
   test("SPARK-22975: MetricsReporter defaults when there was no progress 
reported") {
-    withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+    withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") {
       BlockingSource.latch = new CountDownLatch(1)
       withTempDir { tempDir =>
         val sq = spark.readStream
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
index c5b95fa..3ec4750 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming.continuous
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
 import org.apache.spark.sql.functions._
+import 
org.apache.spark.sql.internal.SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED
 import org.apache.spark.sql.streaming.OutputMode
 
 class ContinuousAggregationSuite extends ContinuousSuiteBase {
@@ -36,7 +37,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
   }
 
   test("basic") {
-    withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+    withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) {
       val input = ContinuousMemoryStream.singlePartition[Int]
 
       testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
@@ -112,7 +113,7 @@ class ContinuousAggregationSuite extends 
ContinuousSuiteBase {
   }
 
   test("repeated restart") {
-    withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+    withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) {
       val input = ContinuousMemoryStream.singlePartition[Int]
 
       testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 9840c7f..dca4520 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
 import org.apache.spark.sql.functions._
-import 
org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
+import 
org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE,
 MIN_BATCHES_TO_RETAIN}
 import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.test.TestSparkSession
 
@@ -307,7 +307,7 @@ class ContinuousMetaSuite extends ContinuousSuiteBase {
       "local[10]",
       "continuous-stream-test-sql-context",
       sparkConf.set("spark.sql.testkey", "true")
-        .set("spark.sql.streaming.minBatchesToRetain", "2")))
+        .set(MIN_BATCHES_TO_RETAIN.key, "2")))
 
   test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint 
directory") {
     withTempDir { checkpointDir =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 7b2c1a5..79016b5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -242,7 +242,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
   override def beforeAll(): Unit = {
     super.beforeAll()
     val fakeCheckpoint = Utils.createTempDir()
-    spark.conf.set("spark.sql.streaming.checkpointLocation", 
fakeCheckpoint.getCanonicalPath)
+    spark.conf.set(SQLConf.CHECKPOINT_LOCATION.key, 
fakeCheckpoint.getCanonicalPath)
   }
 
   override def afterEach(): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 8fb1400..c630f14 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -203,7 +203,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .stop()
     assert(LastOptions.partitionColumns == Seq("a"))
 
-    withSQLConf("spark.sql.caseSensitive" -> "false") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
       df.writeStream
         .format("org.apache.spark.sql.streaming.test")
         .option("checkpointLocation", newMetadataDir)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index e9ab628..126e23e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -409,7 +409,7 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
 
   test("write path implements onTaskCommit API correctly") {
     withSQLConf(
-        "spark.sql.sources.commitProtocolClass" ->
+        SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
           classOf[MessageCapturingCommitProtocol].getCanonicalName) {
       withTempDir { dir =>
         val path = dir.getCanonicalPath
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b06856b..dd18add 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -44,6 +44,7 @@ import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.HiveTestUtils
+import 
org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -536,9 +537,9 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       }
 
       if (HiveUtils.isHive23) {
-        assert(conf.get("spark.sql.hive.version") === Some("2.3.5"))
+        assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5"))
       } else {
-        assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+        assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
       }
     }
   }
@@ -553,9 +554,9 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       }
 
       if (HiveUtils.isHive23) {
-        assert(conf.get("spark.sql.hive.version") === Some("2.3.5"))
+        assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5"))
       } else {
-        assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+        assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
       }
     }
   }
@@ -659,7 +660,7 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.binary
 
   override protected def extraConf: Seq[String] =
-    "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
+    s"--conf ${HIVE_THRIFT_SERVER_SINGLESESSION.key}=true" :: Nil
 
   test("share the temporary functions across JDBC connections") {
     withMultipleConnectionJdbcStatement()(
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 4351dc7..9bc0be8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -27,9 +27,12 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SecurityManager, SparkConf, TestUtils}
+import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
+import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.util.Utils
 
@@ -184,11 +187,11 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
       val args = Seq(
         "--name", "prepare testing tables",
         "--master", "local[2]",
-        "--conf", "spark.ui.enabled=false",
-        "--conf", "spark.master.rest.enabled=false",
-        "--conf", "spark.sql.hive.metastore.version=1.2.1",
-        "--conf", "spark.sql.hive.metastore.jars=maven",
-        "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+        "--conf", s"${UI_ENABLED.key}=false",
+        "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false",
+        "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1",
+        "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven",
+        "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}",
         "--conf", s"spark.sql.test.version.index=$index",
         "--driver-java-options", 
s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
         tempPyFile.getCanonicalPath)
@@ -203,11 +206,11 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
       "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"),
       "--name", "HiveExternalCatalog backward compatibility test",
       "--master", "local[2]",
-      "--conf", "spark.ui.enabled=false",
-      "--conf", "spark.master.rest.enabled=false",
-      "--conf", "spark.sql.hive.metastore.version=1.2.1",
-      "--conf", "spark.sql.hive.metastore.jars=maven",
-      "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+      "--conf", s"${UI_ENABLED.key}=false",
+      "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false",
+      "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1",
+      "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven",
+      "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}",
       "--driver-java-options", 
s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
       unusedJar.toString)
     runSparkSubmit(args)
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 0ff2215..e2ddec3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHiveContext}
+import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.tags.ExtendedHiveTest
 import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -338,10 +340,10 @@ object SetMetastoreURLTest extends Logging {
     val builder = SparkSession.builder()
       .config(sparkConf)
       .config(UI_ENABLED.key, "false")
-      .config("spark.sql.hive.metastore.version", "0.13.1")
+      .config(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1")
       // The issue described in SPARK-16901 only appear when
       // spark.sql.hive.metastore.jars is not set to builtin.
-      .config("spark.sql.hive.metastore.jars", "maven")
+      .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
       .enableHiveSupport()
 
     val spark = builder.getOrCreate()
@@ -392,16 +394,16 @@ object SetWarehouseLocationTest extends Logging {
         // We are expecting that the value of spark.sql.warehouse.dir will 
override the
         // value of hive.metastore.warehouse.dir.
         val session = new TestHiveContext(new SparkContext(sparkConf
-          .set("spark.sql.warehouse.dir", warehouseLocation.toString)
+          .set(WAREHOUSE_PATH.key, warehouseLocation.toString)
           .set("hive.metastore.warehouse.dir", 
hiveWarehouseLocation.toString)))
           .sparkSession
         (session, warehouseLocation.toString)
 
     }
 
-    if (sparkSession.conf.get("spark.sql.warehouse.dir") != 
expectedWarehouseLocation) {
+    if (sparkSession.conf.get(WAREHOUSE_PATH.key) != 
expectedWarehouseLocation) {
       throw new Exception(
-        "spark.sql.warehouse.dir is not set to the expected warehouse location 
" +
+        s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location 
" +
         s"$expectedWarehouseLocation.")
     }
 
@@ -564,7 +566,7 @@ object SparkSubmitClassLoaderTest extends Logging {
     val conf = new SparkConf()
     val hiveWarehouseLocation = Utils.createTempDir()
     conf.set(UI_ENABLED, false)
-    conf.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString)
+    conf.set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString)
     val sc = new SparkContext(conf)
     val hiveContext = new TestHiveContext(sc)
     val df = hiveContext.createDataFrame((1 to 100).map(i => (i, 
i))).toDF("i", "j")
@@ -642,14 +644,14 @@ object SparkSQLConfTest extends Logging {
     val conf = new SparkConf() {
       override def getAll: Array[(String, String)] = {
         def isMetastoreSetting(conf: String): Boolean = {
-          conf == "spark.sql.hive.metastore.version" || conf == 
"spark.sql.hive.metastore.jars"
+          conf == HiveUtils.HIVE_METASTORE_VERSION.key || conf == 
HiveUtils.HIVE_METASTORE_JARS.key
         }
         // If there is any metastore settings, remove them.
         val filteredSettings = super.getAll.filterNot(e => 
isMetastoreSetting(e._1))
 
         // Always add these two metastore settings at the beginning.
-        ("spark.sql.hive.metastore.version" -> "0.12") +:
-        ("spark.sql.hive.metastore.jars" -> "maven") +:
+        (HiveUtils.HIVE_METASTORE_VERSION.key -> "0.12") +:
+        (HiveUtils.HIVE_METASTORE_JARS.key -> "maven") +:
         filteredSettings
       }
 
@@ -676,10 +678,10 @@ object SPARK_9757 extends QueryTest {
     val hiveWarehouseLocation = Utils.createTempDir()
     val sparkContext = new SparkContext(
       new SparkConf()
-        .set("spark.sql.hive.metastore.version", "0.13.1")
-        .set("spark.sql.hive.metastore.jars", "maven")
+        .set(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1")
+        .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
         .set(UI_ENABLED, false)
-        .set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString))
+        .set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString))
 
     val hiveContext = new TestHiveContext(sparkContext)
     spark = hiveContext.sparkSession
@@ -725,7 +727,7 @@ object SPARK_11009 extends QueryTest {
     val sparkContext = new SparkContext(
       new SparkConf()
         .set(UI_ENABLED, false)
-        .set("spark.sql.shuffle.partitions", "100"))
+        .set(SHUFFLE_PARTITIONS.key, "100"))
 
     val hiveContext = new TestHiveContext(sparkContext)
     spark = hiveContext.sparkSession
@@ -756,7 +758,7 @@ object SPARK_14244 extends QueryTest {
     val sparkContext = new SparkContext(
       new SparkConf()
         .set(UI_ENABLED, false)
-        .set("spark.sql.shuffle.partitions", "100"))
+        .set(SHUFFLE_PARTITIONS.key, "100"))
 
     val hiveContext = new TestHiveContext(sparkContext)
     spark = hiveContext.sparkSession
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index ecd4287..d06cc1c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -1028,7 +1028,7 @@ class HashAggregationQueryWithControlledFallbackSuite 
extends AggregationQuerySu
 
   override protected def checkAnswer(actual: => DataFrame, expectedAnswer: 
Seq[Row]): Unit = {
     Seq("true", "false").foreach { enableTwoLevelMaps =>
-      withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enabled" ->
+      withSQLConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key ->
         enableTwoLevelMaps) {
         (1 to 3).foreach { fallbackStartsAt =>
           withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 8cdb8dd..d68a470 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -45,7 +45,7 @@ import 
org.apache.spark.sql.execution.command.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, 
WithTestConf}
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, 
WAREHOUSE_PATH}
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 // SPARK-3729: Test key required to check for initialization errors with 
config.
@@ -57,9 +57,9 @@ object TestHive
       new SparkConf()
         .set("spark.sql.test", "")
         .set(SQLConf.CODEGEN_FALLBACK.key, "false")
-        .set("spark.sql.hive.metastore.barrierPrefixes",
+        .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
           "org.apache.spark.sql.hive.execution.PairSerDe")
-        .set("spark.sql.warehouse.dir", 
TestHiveContext.makeWarehouseDir().toURI.getPath)
+        .set(WAREHOUSE_PATH.key, 
TestHiveContext.makeWarehouseDir().toURI.getPath)
         // SPARK-8910
         .set(UI_ENABLED, false)
         .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
@@ -534,7 +534,7 @@ private[hive] class TestHiveSparkSession(
       }
 
       // Clean out the Hive warehouse between each suite
-      val warehouseDir = new File(new 
URI(sparkContext.conf.get("spark.sql.warehouse.dir")).getPath)
+      val warehouseDir = new File(new 
URI(sparkContext.conf.get(WAREHOUSE_PATH.key)).getPath)
       Utils.deleteRecursively(warehouseDir)
       warehouseDir.mkdir()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to