This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 42b80ae [SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in
SQL
42b80ae is described below
commit 42b80ae128ab1aa8a87c1376fe88e2cde52e6e4f
Author: wangguangxin.cn <[email protected]>
AuthorDate: Thu Jul 11 22:36:07 2019 -0700
[SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
## What changes were proposed in this pull request?
There are some hardcoded configs, using config entry to replace them.
## How was this patch tested?
Existing UT
Closes #25059 from WangGuangxin/ConfigEntry.
Authored-by: wangguangxin.cn <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/avro/AvroSuite.scala | 2 +-
.../kafka010/KafkaDontFailOnDataLossSuite.scala | 2 +-
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 18 +++++++++++++
.../sql/catalyst/analysis/TypeCoercionSuite.scala | 10 ++++----
.../execution/aggregate/HashAggregateExec.scala | 14 ++++------
.../execution/columnar/InMemoryTableScanExec.scala | 3 +--
.../apache/spark/sql/AggregateHashMapSuite.scala | 29 +++++++++++----------
.../org/apache/spark/sql/DataFrameSuite.scala | 2 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 4 +--
.../scala/org/apache/spark/sql/JoinSuite.scala | 8 +++---
.../org/apache/spark/sql/RuntimeConfigSuite.scala | 6 +++--
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 +++---
.../spark/sql/SparkSessionExtensionSuite.scala | 9 ++++---
.../apache/spark/sql/execution/SQLViewSuite.scala | 7 ++---
.../sql/execution/SQLWindowFunctionSuite.scala | 5 ++--
.../sql/execution/arrow/ArrowConvertersSuite.scala | 4 +--
.../execution/benchmark/AggregateBenchmark.scala | 20 +++++++--------
.../BuiltInDataSourceWriteBenchmark.scala | 6 +++--
.../columnar/InMemoryColumnarQuerySuite.scala | 2 +-
.../columnar/PartitionBatchPruningSuite.scala | 2 +-
.../execution/command/PlanResolutionSuite.scala | 3 ++-
.../datasources/FileSourceStrategySuite.scala | 4 +--
.../sql/execution/datasources/csv/CSVSuite.scala | 4 +--
.../sql/execution/datasources/json/JsonSuite.scala | 4 +--
.../datasources/parquet/ParquetIOSuite.scala | 4 +--
.../parquet/ParquetPartitionDiscoverySuite.scala | 2 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 6 ++---
.../sql/execution/metric/SQLMetricsTestUtils.scala | 3 ++-
.../state/StateStoreCoordinatorSuite.scala | 3 ++-
.../streaming/state/StateStoreSuite.scala | 2 +-
.../execution/ui/SQLAppStatusListenerSuite.scala | 2 +-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +-
.../spark/sql/sources/BucketedWriteSuite.scala | 2 +-
.../sql/sources/CreateTableAsSelectSuite.scala | 3 ++-
.../sql/sources/v2/DataSourceV2SQLSuite.scala | 5 ++--
.../sql/streaming/EventTimeWatermarkSuite.scala | 2 +-
.../sql/streaming/FileStreamSourceSuite.scala | 2 +-
.../apache/spark/sql/streaming/StreamSuite.scala | 6 ++---
.../StreamingQueryListenersConfSuite.scala | 3 ++-
.../spark/sql/streaming/StreamingQuerySuite.scala | 8 +++---
.../continuous/ContinuousAggregationSuite.scala | 5 ++--
.../sql/streaming/continuous/ContinuousSuite.scala | 4 +--
.../sources/StreamingDataSourceV2Suite.scala | 2 +-
.../test/DataStreamReaderWriterSuite.scala | 2 +-
.../sql/test/DataFrameReaderWriterSuite.scala | 2 +-
.../thriftserver/HiveThriftServer2Suites.scala | 11 ++++----
.../hive/HiveExternalCatalogVersionsSuite.scala | 23 +++++++++--------
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 30 ++++++++++++----------
.../sql/hive/execution/AggregationQuerySuite.scala | 2 +-
.../org/apache/spark/sql/hive/test/TestHive.scala | 8 +++---
51 files changed, 178 insertions(+), 144 deletions(-)
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 40bf3b1..924bf37 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -50,7 +50,7 @@ abstract class AvroSuite extends QueryTest with
SharedSQLContext with SQLTestUti
override protected def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set("spark.sql.files.maxPartitionBytes", 1024)
+ spark.conf.set(SQLConf.FILES_MAX_PARTITION_BYTES.key, 1024)
}
def checkReloadMatchesSaved(originalFile: String, newFile: String): Unit = {
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index e089e36..ba8340e 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -135,7 +135,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with
KafkaMissingOffsetsTe
test("failOnDataLoss=false should not return duplicated records: microbatch
v1") {
withSQLConf(
- "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+ SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key ->
classOf[KafkaSourceProvider].getCanonicalName) {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery =
true) { (df, table) =>
val query = df.writeStream.format("memory").queryName(table).start()
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 3d14ebe..bb9b369 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1066,7 +1066,7 @@ class KafkaMicroBatchV1SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(
- "spark.sql.streaming.disabledV2MicroBatchReaders",
+ SQLConf.DISABLED_V2_STREAMING_MICROBATCH_READERS.key,
classOf[KafkaSourceProvider].getCanonicalName)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index feb3b46..94d197b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -234,6 +234,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED =
+ buildConf("spark.sql.inMemoryTableScanStatistics.enable")
+ .internal()
+ .doc("When true, enable in-memory table scan accumulators.")
+ .booleanConf
+ .createWithDefault(false)
+
val CACHE_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
.doc("Enables vectorized reader for columnar caching.")
@@ -1024,6 +1031,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val ENABLE_VECTORIZED_HASH_MAP =
+ buildConf("spark.sql.codegen.aggregate.map.vectorized.enable")
+ .internal()
+ .doc("Enable vectorized aggregate hash map. This is for
testing/benchmarking only.")
+ .booleanConf
+ .createWithDefault(false)
+
val MAX_NESTED_VIEW_DEPTH =
buildConf("spark.sql.view.maxNestedViewDepth")
.internal()
@@ -2109,6 +2123,8 @@ class SQLConf extends Serializable with Logging {
def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
+ def inMemoryTableScanStatisticsEnabled: Boolean =
getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED)
+
def offHeapColumnVectorEnabled: Boolean =
getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
def columnNameOfCorruptRecord: String =
getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -2148,6 +2164,8 @@ class SQLConf extends Serializable with Logging {
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
+ def enableVectorizedHashMap: Boolean = getConf(ENABLE_VECTORIZED_HASH_MAP)
+
def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)
def objectAggSortBasedFallbackThreshold: Int =
getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 2c3ba1b..a725e4b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1126,14 +1126,14 @@ class TypeCoercionSuite extends AnalysisTest {
Concat(Seq(Cast(Literal(new java.sql.Date(0)), StringType),
Cast(Literal(new Timestamp(0)), StringType))))
- withSQLConf("spark.sql.function.concatBinaryAsString" -> "true") {
+ withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "true") {
ruleTest(rule,
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
Concat(Seq(Cast(Literal("123".getBytes), StringType),
Cast(Literal("456".getBytes), StringType))))
}
- withSQLConf("spark.sql.function.concatBinaryAsString" -> "false") {
+ withSQLConf(SQLConf.CONCAT_BINARY_AS_STRING.key -> "false") {
ruleTest(rule,
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))),
Concat(Seq(Literal("123".getBytes), Literal("456".getBytes))))
@@ -1180,14 +1180,14 @@ class TypeCoercionSuite extends AnalysisTest {
Elt(Seq(Literal(2), Cast(Literal(new java.sql.Date(0)), StringType),
Cast(Literal(new Timestamp(0)), StringType))))
- withSQLConf("spark.sql.function.eltOutputAsString" -> "true") {
+ withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "true") {
ruleTest(rule,
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
Elt(Seq(Literal(1), Cast(Literal("123".getBytes), StringType),
Cast(Literal("456".getBytes), StringType))))
}
- withSQLConf("spark.sql.function.eltOutputAsString" -> "false") {
+ withSQLConf(SQLConf.ELT_OUTPUT_AS_STRING.key -> "false") {
ruleTest(rule,
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))),
Elt(Seq(Literal(1), Literal("123".getBytes), Literal("456".getBytes))))
@@ -1498,7 +1498,7 @@ class TypeCoercionSuite extends AnalysisTest {
DoubleType)))
Seq(true, false).foreach { convertToTS =>
withSQLConf(
- "spark.sql.legacy.compareDateTimestampInTimestamp" ->
convertToTS.toString) {
+ SQLConf.COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP.key ->
convertToTS.toString) {
val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01
00:00:00"))
val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01
00:00:01"))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 25ff658..4a95f76 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
import org.apache.spark.util.Utils
@@ -559,7 +560,7 @@ case class HashAggregateExec(
private def enableTwoLevelHashMap(ctx: CodegenContext): Unit = {
if (!checkIfFastHashMapSupported(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) &&
!Utils.isTesting) {
- logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to
true, but"
+ logInfo(s"${SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key} is set to true, but"
+ " current version of codegened fast hashmap does not support this
aggregate.")
}
} else {
@@ -567,8 +568,7 @@ case class HashAggregateExec(
// This is for testing/benchmarking only.
// We enforce to first level to be a vectorized hashmap, instead of the
default row-based one.
- isVectorizedHashMapEnabled = sqlContext.getConf(
- "spark.sql.codegen.aggregate.map.vectorized.enable", "false") == "true"
+ isVectorizedHashMapEnabled = sqlContext.conf.enableVectorizedHashMap
}
}
@@ -576,12 +576,8 @@ case class HashAggregateExec(
val initAgg = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initAgg")
if (sqlContext.conf.enableTwoLevelAggMap) {
enableTwoLevelHashMap(ctx)
- } else {
- sqlContext.getConf("spark.sql.codegen.aggregate.map.vectorized.enable",
null) match {
- case "true" =>
- logWarning("Two level hashmap is disabled but vectorized hashmap is
enabled.")
- case _ =>
- }
+ } else if (sqlContext.conf.enableVectorizedHashMap) {
+ logWarning("Two level hashmap is disabled but vectorized hashmap is
enabled.")
}
val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 7a8c6d6..3566ab1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -297,8 +297,7 @@ case class InMemoryTableScanExec(
}
}
- lazy val enableAccumulatorsForTest: Boolean =
- sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable",
"false").toBoolean
+ lazy val enableAccumulatorsForTest: Boolean =
sqlContext.conf.inMemoryTableScanStatisticsEnabled
// Accumulators used for testing purposes
lazy val readPartitions = sparkContext.longAccumulator
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
index 938d76c..b253c4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/AggregateHashMapSuite.scala
@@ -20,33 +20,34 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.SQLConf
class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with
BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
- .set("spark.sql.codegen.fallback", "false")
- .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "false")
// adding some checking after each test is run, assuring that the configs
are not changed
// in test code
after {
- assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+ assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
- assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled")
== "false",
+ assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "false",
"configuration parameter changed in test body")
}
}
class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with
BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
- .set("spark.sql.codegen.fallback", "false")
- .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")
// adding some checking after each test is run, assuring that the configs
are not changed
// in test code
after {
- assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+ assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
- assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled")
== "true",
+ assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
"configuration parameter changed in test body")
}
}
@@ -56,18 +57,18 @@ class TwoLevelAggregateHashMapWithVectorizedMapSuite
with BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
- .set("spark.sql.codegen.fallback", "false")
- .set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
- .set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key, "true")
+ .set(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key, "true")
// adding some checking after each test is run, assuring that the configs
are not changed
// in test code
after {
- assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
+ assert(sparkConf.get(SQLConf.CODEGEN_FALLBACK.key) == "false",
"configuration parameter changed in test body")
- assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled")
== "true",
+ assert(sparkConf.get(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key) == "true",
"configuration parameter changed in test body")
- assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable")
== "true",
+ assert(sparkConf.get(SQLConf.ENABLE_VECTORIZED_HASH_MAP.key) == "true",
"configuration parameter changed in test body")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 9893670..e8ddd4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1672,7 +1672,7 @@ class DataFrameSuite extends QueryTest with
SharedSQLContext {
}
test("reuse exchange") {
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "2") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2") {
val df = spark.range(100).toDF()
val join = df.join(df, "id")
val plan = join.queryExecution.executedPlan
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index fffe52d..8919528 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -671,8 +671,8 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
test("SPARK-22790,SPARK-27668: spark.sql.sources.compressionFactor takes
effect") {
Seq(1.0, 0.5).foreach { compressionFactor =>
- withSQLConf("spark.sql.sources.fileCompressionFactor" ->
compressionFactor.toString,
- "spark.sql.autoBroadcastJoinThreshold" -> "250") {
+ withSQLConf(SQLConf.FILE_COMRESSION_FACTOR.key ->
compressionFactor.toString,
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "250") {
withTempPath { workDir =>
// the file size is 486 bytes
val workDirPath = workDir.getAbsolutePath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 32cddc9..531cc86 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -72,7 +72,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
test("join operator selection") {
spark.sharedState.cacheManager.clearCache()
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
@@ -651,7 +651,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
test("test SortMergeJoin (without spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
- "spark.sql.sortMergeJoinExec.buffer.spill.threshold" ->
Int.MaxValue.toString) {
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key ->
Int.MaxValue.toString) {
assertNotSpilled(sparkContext, "inner join") {
checkAnswer(
@@ -708,8 +708,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
test("test SortMergeJoin (with spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
- "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
- "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
+ SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
assertSpilled(sparkContext, "inner join") {
checkAnswer(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
index 3284231..720d570 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
+import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
+import
org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
class RuntimeConfigSuite extends SparkFunSuite {
@@ -60,8 +62,8 @@ class RuntimeConfigSuite extends SparkFunSuite {
val conf = newConf()
// SQL configs
- assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold"))
- assert(conf.isModifiable("spark.sql.streaming.checkpointLocation"))
+ assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
+ assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
// Core configs
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
assert(!conf.isModifiable("spark.executor.cores"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2cc1be9..9729506 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1896,7 +1896,7 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
}
test("Star Expansion - group by") {
- withSQLConf("spark.sql.retainGroupColumns" -> "false") {
+ withSQLConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS.key -> "false") {
checkAnswer(
testData2.groupBy($"a", $"b").agg($"*"),
sql("SELECT * FROM testData2 group by a, b"))
@@ -1936,7 +1936,7 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
test("Common subexpression elimination") {
// TODO: support subexpression elimination in whole stage codegen
- withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
// select from a table to prevent constant folding.
val df = sql("SELECT a, b from testData2 limit 1")
checkAnswer(df, Row(1, 1))
@@ -1985,9 +1985,9 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"),
Row(4, 2), 1)
// Try disabling it via configuration.
- spark.conf.set("spark.sql.subexpressionElimination.enabled", "false")
+ spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false")
verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 2)
- spark.conf.set("spark.sql.subexpressionElimination.enabled", "true")
+ spark.conf.set(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "true")
verifyCallCount(df.selectExpr("testUdf(a)", "testUdf(a)"), Row(1, 1), 1)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 2e2e61b..74341f9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType,
Metadata, StructType}
import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch,
ColumnarMap, ColumnVector}
import org.apache.spark.unsafe.types.UTF8String
@@ -152,7 +153,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use custom class for extensions") {
val session = SparkSession.builder()
.master("local[1]")
- .config("spark.sql.extensions", classOf[MyExtensions].getCanonicalName)
+ .config(SPARK_SESSION_EXTENSIONS.key,
classOf[MyExtensions].getCanonicalName)
.getOrCreate()
try {
assert(session.sessionState.planner.strategies.contains(MySparkStrategy(session)))
@@ -173,7 +174,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use multiple custom class for extensions in the specified order") {
val session = SparkSession.builder()
.master("local[1]")
- .config("spark.sql.extensions", Seq(
+ .config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions2].getCanonicalName,
classOf[MyExtensions].getCanonicalName).mkString(","))
.getOrCreate()
@@ -201,7 +202,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("allow an extension to be duplicated") {
val session = SparkSession.builder()
.master("local[1]")
- .config("spark.sql.extensions", Seq(
+ .config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions].getCanonicalName,
classOf[MyExtensions].getCanonicalName).mkString(","))
.getOrCreate()
@@ -228,7 +229,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
test("use the last registered function name when there are duplicates") {
val session = SparkSession.builder()
.master("local[1]")
- .config("spark.sql.extensions", Seq(
+ .config(SPARK_SESSION_EXTENSIONS.key, Seq(
classOf[MyExtensions2].getCanonicalName,
classOf[MyExtensions2Duplicate].getCanonicalName).mkString(","))
.getOrCreate()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 8269d4d..64e305c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.internal.SQLConf.MAX_NESTED_VIEW_DEPTH
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext
@@ -665,17 +666,17 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
sql(s"CREATE VIEW view${idx + 1} AS SELECT * FROM view$idx")
}
- withSQLConf("spark.sql.view.maxNestedViewDepth" -> "10") {
+ withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "10") {
val e = intercept[AnalysisException] {
sql("SELECT * FROM view10")
}.getMessage
assert(e.contains("The depth of view `default`.`view0` exceeds the
maximum view " +
"resolution depth (10). Analysis is aborted to avoid errors.
Increase the value " +
- "of spark.sql.view.maxNestedViewDepth to work around this."))
+ s"of ${MAX_NESTED_VIEW_DEPTH.key} to work around this."))
}
val e = intercept[IllegalArgumentException] {
- withSQLConf("spark.sql.view.maxNestedViewDepth" -> "0") {}
+ withSQLConf(MAX_NESTED_VIEW_DEPTH.key -> "0") {}
}.getMessage
assert(e.contains("The maximum depth of a view reference in a nested
view must be " +
"positive."))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index 1c6fc35..971fd84 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.TestUtils.assertSpilled
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
import org.apache.spark.sql.test.SharedSQLContext
case class WindowData(month: Int, area: String, product: Int)
@@ -477,8 +478,8 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSQLContext {
|WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT
RoW)
""".stripMargin)
- withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1",
- "spark.sql.windowExec.buffer.spill.threshold" -> "2") {
+ withSQLConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "1",
+ WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "2") {
assertSpilled(sparkContext, "test with low buffer spill threshold") {
checkAnswer(actual, expected)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 86874b9..67c3fa0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1191,7 +1191,7 @@ class ArrowConvertersSuite extends SharedSQLContext with
BeforeAndAfterAll {
test("max records in batch conf") {
val totalRecords = 10
val maxRecordsPerBatch = 3
- spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch",
maxRecordsPerBatch)
+ spark.conf.set(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key,
maxRecordsPerBatch)
val df = spark.sparkContext.parallelize(1 to totalRecords, 2).toDF("i")
val arrowBatches = df.toArrowBatchRdd.collect()
assert(arrowBatches.length >= 4)
@@ -1206,7 +1206,7 @@ class ArrowConvertersSuite extends SharedSQLContext with
BeforeAndAfterAll {
}
assert(recordCount == totalRecords)
allocator.close()
- spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch")
+ spark.conf.unset(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH.key)
}
testQuietly("unsupported types") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index 81158d9..2776bc3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -83,7 +83,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
f()
}
}
@@ -92,7 +92,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
f()
}
}
@@ -119,7 +119,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
f()
}
}
@@ -128,7 +128,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
f()
}
}
@@ -154,7 +154,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
f()
}
}
@@ -163,7 +163,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
f()
}
}
@@ -189,7 +189,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
f()
}
}
@@ -198,7 +198,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
f()
}
}
@@ -234,7 +234,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "false") {
f()
}
}
@@ -243,7 +243,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
- "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
+ SQLConf.ENABLE_VECTORIZED_HASH_MAP.key -> "true") {
f()
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index cd97324..6925bdd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.benchmark
+import org.apache.spark.sql.internal.SQLConf
+
/**
* Benchmark to measure built-in data sources write performance.
* To run this benchmark:
@@ -45,8 +47,8 @@ object BuiltInDataSourceWriteBenchmark extends
DataSourceWriteBenchmark {
mainArgs
}
- spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
- spark.conf.set("spark.sql.orc.compression.codec", "snappy")
+ spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
+ spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
formats.foreach { format =>
runBenchmark(s"$format writer benchmark") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 466baf2..711ecf1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -437,7 +437,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with
SharedSQLContext {
}
test("SPARK-20356: pruned InMemoryTableScanExec should have correct ordering
and partitioning") {
- withSQLConf("spark.sql.shuffle.partitions" -> "200") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "200") {
val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group")
val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id")
val df3 = df1.join(df2, Seq("item")).select($"id",
$"group".as("item")).distinct()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index b3a5c68..e740992 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -50,7 +50,7 @@ class PartitionBatchPruningSuite
// Enable in-memory partition pruning
spark.conf.set(SQLConf.IN_MEMORY_PARTITION_PRUNING.key, true)
// Enable in-memory table scan accumulators
- spark.conf.set("spark.sql.inMemoryTableScanStatistics.enable", "true")
+ spark.conf.set(SQLConf.IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED.key, "true")
}
override protected def afterAll(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 7df0dab..ce20966 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
CreateV2Table, DropTable, LogicalPlan}
import org.apache.spark.sql.execution.datasources.{CreateTable,
DataSourceResolution}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
+import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType,
StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -77,7 +78,7 @@ class PlanResolutionSuite extends AnalysisTest {
def parseAndResolve(query: String, withDefault: Boolean = false):
LogicalPlan = {
val newConf = conf.copy()
- newConf.setConfString("spark.sql.default.catalog", "testcat")
+ newConf.setConfString(DEFAULT_V2_CATALOG.key, "testcat")
DataSourceResolution(newConf, if (withDefault) lookupWithDefault else
lookupWithoutDefault)
.apply(parsePlan(query))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index af524c7..eaff5a2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -201,7 +201,7 @@ class FileSourceStrategySuite extends QueryTest with
SharedSQLContext with Predi
}
test("partitioned table - case insensitive") {
- withSQLConf("spark.sql.caseSensitive" -> "false") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val table =
createTable(
files = Seq(
@@ -437,7 +437,7 @@ class FileSourceStrategySuite extends QueryTest with
SharedSQLContext with Predi
}
test("[SPARK-16818] exchange reuse respects differences in partition
pruning") {
- spark.conf.set("spark.sql.exchange.reuse", true)
+ spark.conf.set(SQLConf.EXCHANGE_REUSE_ENABLED.key, true)
withTempPath { path =>
val tempDir = path.getCanonicalPath
spark.range(10)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2e7d682..fdb50a6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1399,8 +1399,8 @@ class CSVSuite extends QueryTest with SharedSQLContext
with SQLTestUtils with Te
// that whole test file is mapped to only one partition. This will
guarantee
// reliable sampling of the input file.
withSQLConf(
- "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
- "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString,
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString
)(withTempPath { path =>
val ds = sampledTestData.coalesce(1)
ds.write.text(path.getAbsolutePath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6316e89..34b44be 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2041,8 +2041,8 @@ class JsonSuite extends QueryTest with SharedSQLContext
with TestJsonData {
// that whole test file is mapped to only one partition. This will
guarantee
// reliable sampling of the input file.
withSQLConf(
- "spark.sql.files.maxPartitionBytes" -> (128 * 1024 * 1024).toString,
- "spark.sql.files.openCostInBytes" -> (4 * 1024 * 1024).toString
+ SQLConf.FILES_MAX_PARTITION_BYTES.key -> (128 * 1024 * 1024).toString,
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (4 * 1024 * 1024).toString
)(withTempPath { path =>
val ds = sampledTestData.coalesce(1)
ds.write.text(path.getAbsolutePath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6b05b9c..6f2218b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -475,7 +475,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSQLContext {
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
val extraOptions = Map(
SQLConf.OUTPUT_COMMITTER_CLASS.key ->
classOf[ParquetOutputCommitter].getCanonicalName,
- "spark.sql.parquet.output.committer.class" ->
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName
)
withTempPath { dir =>
@@ -505,7 +505,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSQLContext {
// Using a output committer that always fail when committing a task, so
that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
- "spark.sql.parquet.output.committer.class" ->
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 6f3ed3d..04ace0a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -953,7 +953,7 @@ abstract class ParquetPartitionDiscoverySuite
withSQLConf(
ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
- "spark.sql.sources.commitProtocolClass" ->
+ SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
spark.range(3).write.parquet(s"$path/p0=0/p1=0")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index b260f5d..dc4a299 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -277,9 +277,9 @@ class SQLMetricsSuite extends SparkFunSuite with
SQLMetricsTestUtils with Shared
}
test("ShuffledHashJoin metrics") {
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
- "spark.sql.shuffle.partitions" -> "2",
- "spark.sql.join.preferSortMergeJoin" -> "false") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+ SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key", "value")
// Assume the execution plan is
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index f12eeaa..8f26c04 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
+import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
import org.apache.spark.sql.test.SQLTestUtils
@@ -154,7 +155,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
expectedNodeIds: Set[Long],
enableWholeStage: Boolean = false): Option[Map[Long, (String,
Map[String, Any])]] = {
val previousExecutionIds = currentExecutionIds()
- withSQLConf("spark.sql.codegen.wholeStage" -> enableWholeStage.toString) {
+ withSQLConf(WHOLESTAGE_CODEGEN_ENABLED.key -> enableWholeStage.toString) {
df.collect()
}
sparkContext.listenerBus.waitUntilEmpty(10000)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 2a1e7d6..7bca225 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{MemoryStream,
StreamingQueryWrapper}
import org.apache.spark.sql.functions.count
+import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import org.apache.spark.util.Utils
class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext
{
@@ -124,7 +125,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
import spark.implicits._
coordRef = spark.streams.stateStoreCoordinator
implicit val sqlContext = spark.sqlContext
- spark.conf.set("spark.sql.shuffle.partitions", "1")
+ spark.conf.set(SHUFFLE_PARTITIONS.key, "1")
// Start a query and run a batch to load state stores
val inputData = MemoryStream[Int]
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index af4369d..a84d107 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -569,7 +569,7 @@ class StateStoreSuite extends
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val spark = SparkSession.builder().master("local[2]").getOrCreate()
SparkSession.setActiveSession(spark)
implicit val sqlContext = spark.sqlContext
- spark.conf.set("spark.sql.shuffle.partitions", "1")
+ spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1")
import spark.implicits._
val inputData = MemoryStream[Int]
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index e3e5ddf..8edbb87 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -647,7 +647,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends
SparkFunSuite {
.setMaster("local")
.setAppName("test")
.set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this
test quickly
- .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run
this test quickly
+ .set(UI_RETAINED_EXECUTIONS.key, "50") // Set it to 50 to run this test
quickly
.set(ASYNC_TRACKING_ENABLED, false)
withSpark(new SparkContext(conf)) { sc =>
quietly {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5f27e75..a28f4e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1322,7 +1322,7 @@ class JDBCSuite extends QueryTest
testJdbcParitionColumn("THEID", "THEID")
testJdbcParitionColumn("\"THEID\"", "THEID")
- withSQLConf("spark.sql.caseSensitive" -> "false") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
testJdbcParitionColumn("ThEiD", "THEID")
}
testJdbcParitionColumn("THE ID", "THE ID")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index fc61050..75f68de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -63,7 +63,7 @@ abstract class BucketedWriteSuite extends QueryTest with
SQLTestUtils {
val maxNrBuckets: Int = 200000
val catalog = spark.sessionState.catalog
- withSQLConf("spark.sql.sources.bucketing.maxBuckets" ->
maxNrBuckets.toString) {
+ withSQLConf(SQLConf.BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) {
// within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
withTable("t") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index d46029e..5f98566 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.internal.SQLConf.BUCKETING_MAX_BUCKETS
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -252,7 +253,7 @@ class CreateTableAsSelectSuite
val maxNrBuckets: Int = 200000
val catalog = spark.sessionState.catalog
- withSQLConf("spark.sql.sources.bucketing.maxBuckets" ->
maxNrBuckets.toString) {
+ withSQLConf(BUCKETING_MAX_BUCKETS.key -> maxNrBuckets.toString) {
// Within the new limit
Seq(100001, maxNrBuckets).foreach(numBuckets => {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
index 70d0624..c90090a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
+import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType,
LongType, MapType, StringType, StructField, StructType, TimestampType}
@@ -39,7 +40,7 @@ class DataSourceV2SQLSuite extends QueryTest with
SharedSQLContext with BeforeAn
before {
spark.conf.set("spark.sql.catalog.testcat",
classOf[TestInMemoryTableCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2",
classOf[TestInMemoryTableCatalog].getName)
- spark.conf.set("spark.sql.catalog.session",
classOf[TestInMemoryTableCatalog].getName)
+ spark.conf.set(V2_SESSION_CATALOG.key,
classOf[TestInMemoryTableCatalog].getName)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L,
"c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
@@ -281,7 +282,7 @@ class DataSourceV2SQLSuite extends QueryTest with
SharedSQLContext with BeforeAn
test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
val sparkSession = spark.newSession()
- sparkSession.conf.set("spark.sql.catalog.session",
classOf[V2SessionCatalog].getName)
+ sparkSession.conf.set(V2_SESSION_CATALOG.key,
classOf[V2SessionCatalog].getName)
val df = sparkSession.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L,
"c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 4bf49ff..92ec2a0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -305,7 +305,7 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
test("update mode") {
val inputData = MemoryStream[Int]
- spark.conf.set("spark.sql.shuffle.partitions", "10")
+ spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "10")
val windowedAggregation = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2b8d773..72f8938 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1310,7 +1310,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val start = startId.map(new FileStreamSourceOffset(_))
val end = FileStreamSourceOffset(endId)
- withSQLConf("spark.sql.streaming.unsupportedOperationCheck" ->
"false") {
+ withSQLConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key ->
"false") {
assert(fileSource.getBatch(start,
end).as[String].collect().toSeq === expected)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f2f5fad..1ed2599 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -871,7 +871,7 @@ class StreamSuite extends StreamTest {
testQuietly("specify custom state store provider") {
val providerClassName = classOf[TestStateStoreProvider].getCanonicalName
- withSQLConf("spark.sql.streaming.stateStore.providerClass" ->
providerClassName) {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName) {
val input = MemoryStream[Int]
val df = input.toDS().groupBy().count()
val query =
df.writeStream.outputMode("complete").format("memory").queryName("name").start()
@@ -888,9 +888,9 @@ class StreamSuite extends StreamTest {
testQuietly("custom state store provider read from offset log") {
val input = MemoryStream[Int]
val df = input.toDS().groupBy().count()
- val providerConf1 = "spark.sql.streaming.stateStore.providerClass" ->
+ val providerConf1 = SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"
- val providerConf2 = "spark.sql.streaming.stateStore.providerClass" ->
+ val providerConf2 = SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[TestStateStoreProvider].getCanonicalName
def runQuery(queryName: String, checkpointLoc: String): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
index 88f510c..da2f221 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -29,7 +30,7 @@ class StreamingQueryListenersConfSuite extends StreamTest
with BeforeAndAfter {
import testImplicits._
override protected def sparkConf: SparkConf =
- super.sparkConf.set("spark.sql.streaming.streamingQueryListeners",
+ super.sparkConf.set(STREAMING_QUERY_LISTENERS.key,
"org.apache.spark.sql.streaming.TestListener")
test("test if the configured query lister is loaded") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index a5cb25c..e6b56e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -413,9 +413,9 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
sources.nonEmpty
}
// Disabled by default
- assert(spark.conf.get("spark.sql.streaming.metricsEnabled").toBoolean ===
false)
+ assert(spark.conf.get(SQLConf.STREAMING_METRICS_ENABLED.key).toBoolean ===
false)
- withSQLConf("spark.sql.streaming.metricsEnabled" -> "false") {
+ withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "false") {
testStream(inputData.toDF)(
AssertOnQuery { q => !isMetricsRegistered(q) },
StopStream,
@@ -424,7 +424,7 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
}
// Registered when enabled
- withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+ withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") {
testStream(inputData.toDF)(
AssertOnQuery { q => isMetricsRegistered(q) },
StopStream,
@@ -434,7 +434,7 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
}
test("SPARK-22975: MetricsReporter defaults when there was no progress
reported") {
- withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+ withSQLConf(SQLConf.STREAMING_METRICS_ENABLED.key -> "true") {
BlockingSource.latch = new CountDownLatch(1)
withTempDir { tempDir =>
val sq = spark.readStream
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
index c5b95fa..3ec4750 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming.continuous
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.functions._
+import
org.apache.spark.sql.internal.SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED
import org.apache.spark.sql.streaming.OutputMode
class ContinuousAggregationSuite extends ContinuousSuiteBase {
@@ -36,7 +37,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
}
test("basic") {
- withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+ withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) {
val input = ContinuousMemoryStream.singlePartition[Int]
testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
@@ -112,7 +113,7 @@ class ContinuousAggregationSuite extends
ContinuousSuiteBase {
}
test("repeated restart") {
- withSQLConf(("spark.sql.streaming.unsupportedOperationCheck", "false")) {
+ withSQLConf((UNSUPPORTED_OPERATION_CHECK_ENABLED.key, "false")) {
val input = ContinuousMemoryStream.singlePartition[Int]
testStream(input.toDF().agg(max('value)), OutputMode.Complete)(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 9840c7f..dca4520 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.functions._
-import
org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
+import
org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE,
MIN_BATCHES_TO_RETAIN}
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.TestSparkSession
@@ -307,7 +307,7 @@ class ContinuousMetaSuite extends ContinuousSuiteBase {
"local[10]",
"continuous-stream-test-sql-context",
sparkConf.set("spark.sql.testkey", "true")
- .set("spark.sql.streaming.minBatchesToRetain", "2")))
+ .set(MIN_BATCHES_TO_RETAIN.key, "2")))
test("SPARK-24351: check offsetLog/commitLog retained in the checkpoint
directory") {
withTempDir { checkpointDir =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 7b2c1a5..79016b5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -242,7 +242,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
override def beforeAll(): Unit = {
super.beforeAll()
val fakeCheckpoint = Utils.createTempDir()
- spark.conf.set("spark.sql.streaming.checkpointLocation",
fakeCheckpoint.getCanonicalPath)
+ spark.conf.set(SQLConf.CHECKPOINT_LOCATION.key,
fakeCheckpoint.getCanonicalPath)
}
override def afterEach(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 8fb1400..c630f14 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -203,7 +203,7 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
.stop()
assert(LastOptions.partitionColumns == Seq("a"))
- withSQLConf("spark.sql.caseSensitive" -> "false") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index e9ab628..126e23e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -409,7 +409,7 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSQLContext with Be
test("write path implements onTaskCommit API correctly") {
withSQLConf(
- "spark.sql.sources.commitProtocolClass" ->
+ SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[MessageCapturingCommitProtocol].getCanonicalName) {
withTempDir { dir =>
val path = dir.getCanonicalPath
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b06856b..dd18add 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -44,6 +44,7 @@ import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.HiveTestUtils
+import
org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -536,9 +537,9 @@ class HiveThriftBinaryServerSuite extends
HiveThriftJdbcTest {
}
if (HiveUtils.isHive23) {
- assert(conf.get("spark.sql.hive.version") === Some("2.3.5"))
+ assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5"))
} else {
- assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+ assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
}
}
}
@@ -553,9 +554,9 @@ class HiveThriftBinaryServerSuite extends
HiveThriftJdbcTest {
}
if (HiveUtils.isHive23) {
- assert(conf.get("spark.sql.hive.version") === Some("2.3.5"))
+ assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.5"))
} else {
- assert(conf.get("spark.sql.hive.version") === Some("1.2.1"))
+ assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
}
}
}
@@ -659,7 +660,7 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.binary
override protected def extraConf: Seq[String] =
- "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
+ s"--conf ${HIVE_THRIFT_SERVER_SINGLESESSION.key}=true" :: Nil
test("share the temporary functions across JDBC connections") {
withMultipleConnectionJdbcStatement()(
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 4351dc7..9bc0be8 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -27,9 +27,12 @@ import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SecurityManager, SparkConf, TestUtils}
+import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
+import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils
@@ -184,11 +187,11 @@ class HiveExternalCatalogVersionsSuite extends
SparkSubmitTestUtils {
val args = Seq(
"--name", "prepare testing tables",
"--master", "local[2]",
- "--conf", "spark.ui.enabled=false",
- "--conf", "spark.master.rest.enabled=false",
- "--conf", "spark.sql.hive.metastore.version=1.2.1",
- "--conf", "spark.sql.hive.metastore.jars=maven",
- "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+ "--conf", s"${UI_ENABLED.key}=false",
+ "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false",
+ "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1",
+ "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven",
+ "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}",
"--conf", s"spark.sql.test.version.index=$index",
"--driver-java-options",
s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
tempPyFile.getCanonicalPath)
@@ -203,11 +206,11 @@ class HiveExternalCatalogVersionsSuite extends
SparkSubmitTestUtils {
"--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"),
"--name", "HiveExternalCatalog backward compatibility test",
"--master", "local[2]",
- "--conf", "spark.ui.enabled=false",
- "--conf", "spark.master.rest.enabled=false",
- "--conf", "spark.sql.hive.metastore.version=1.2.1",
- "--conf", "spark.sql.hive.metastore.jars=maven",
- "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+ "--conf", s"${UI_ENABLED.key}=false",
+ "--conf", s"${MASTER_REST_SERVER_ENABLED.key}=false",
+ "--conf", s"${HiveUtils.HIVE_METASTORE_VERSION.key}=1.2.1",
+ "--conf", s"${HiveUtils.HIVE_METASTORE_JARS.key}=maven",
+ "--conf", s"${WAREHOUSE_PATH.key}=${wareHousePath.getCanonicalPath}",
"--driver-java-options",
s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
unusedJar.toString)
runSparkSubmit(args)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 0ff2215..e2ddec3 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.hive.test.{HiveTestUtils, TestHiveContext}
+import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
+import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -338,10 +340,10 @@ object SetMetastoreURLTest extends Logging {
val builder = SparkSession.builder()
.config(sparkConf)
.config(UI_ENABLED.key, "false")
- .config("spark.sql.hive.metastore.version", "0.13.1")
+ .config(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1")
// The issue described in SPARK-16901 only appear when
// spark.sql.hive.metastore.jars is not set to builtin.
- .config("spark.sql.hive.metastore.jars", "maven")
+ .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
.enableHiveSupport()
val spark = builder.getOrCreate()
@@ -392,16 +394,16 @@ object SetWarehouseLocationTest extends Logging {
// We are expecting that the value of spark.sql.warehouse.dir will
override the
// value of hive.metastore.warehouse.dir.
val session = new TestHiveContext(new SparkContext(sparkConf
- .set("spark.sql.warehouse.dir", warehouseLocation.toString)
+ .set(WAREHOUSE_PATH.key, warehouseLocation.toString)
.set("hive.metastore.warehouse.dir",
hiveWarehouseLocation.toString)))
.sparkSession
(session, warehouseLocation.toString)
}
- if (sparkSession.conf.get("spark.sql.warehouse.dir") !=
expectedWarehouseLocation) {
+ if (sparkSession.conf.get(WAREHOUSE_PATH.key) !=
expectedWarehouseLocation) {
throw new Exception(
- "spark.sql.warehouse.dir is not set to the expected warehouse location
" +
+ s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location
" +
s"$expectedWarehouseLocation.")
}
@@ -564,7 +566,7 @@ object SparkSubmitClassLoaderTest extends Logging {
val conf = new SparkConf()
val hiveWarehouseLocation = Utils.createTempDir()
conf.set(UI_ENABLED, false)
- conf.set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString)
+ conf.set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString)
val sc = new SparkContext(conf)
val hiveContext = new TestHiveContext(sc)
val df = hiveContext.createDataFrame((1 to 100).map(i => (i,
i))).toDF("i", "j")
@@ -642,14 +644,14 @@ object SparkSQLConfTest extends Logging {
val conf = new SparkConf() {
override def getAll: Array[(String, String)] = {
def isMetastoreSetting(conf: String): Boolean = {
- conf == "spark.sql.hive.metastore.version" || conf ==
"spark.sql.hive.metastore.jars"
+ conf == HiveUtils.HIVE_METASTORE_VERSION.key || conf ==
HiveUtils.HIVE_METASTORE_JARS.key
}
// If there is any metastore settings, remove them.
val filteredSettings = super.getAll.filterNot(e =>
isMetastoreSetting(e._1))
// Always add these two metastore settings at the beginning.
- ("spark.sql.hive.metastore.version" -> "0.12") +:
- ("spark.sql.hive.metastore.jars" -> "maven") +:
+ (HiveUtils.HIVE_METASTORE_VERSION.key -> "0.12") +:
+ (HiveUtils.HIVE_METASTORE_JARS.key -> "maven") +:
filteredSettings
}
@@ -676,10 +678,10 @@ object SPARK_9757 extends QueryTest {
val hiveWarehouseLocation = Utils.createTempDir()
val sparkContext = new SparkContext(
new SparkConf()
- .set("spark.sql.hive.metastore.version", "0.13.1")
- .set("spark.sql.hive.metastore.jars", "maven")
+ .set(HiveUtils.HIVE_METASTORE_VERSION.key, "0.13.1")
+ .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
.set(UI_ENABLED, false)
- .set("spark.sql.warehouse.dir", hiveWarehouseLocation.toString))
+ .set(WAREHOUSE_PATH.key, hiveWarehouseLocation.toString))
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession
@@ -725,7 +727,7 @@ object SPARK_11009 extends QueryTest {
val sparkContext = new SparkContext(
new SparkConf()
.set(UI_ENABLED, false)
- .set("spark.sql.shuffle.partitions", "100"))
+ .set(SHUFFLE_PARTITIONS.key, "100"))
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession
@@ -756,7 +758,7 @@ object SPARK_14244 extends QueryTest {
val sparkContext = new SparkContext(
new SparkConf()
.set(UI_ENABLED, false)
- .set("spark.sql.shuffle.partitions", "100"))
+ .set(SHUFFLE_PARTITIONS.key, "100"))
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index ecd4287..d06cc1c 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -1028,7 +1028,7 @@ class HashAggregationQueryWithControlledFallbackSuite
extends AggregationQuerySu
override protected def checkAnswer(actual: => DataFrame, expectedAnswer:
Seq[Row]): Unit = {
Seq("true", "false").foreach { enableTwoLevelMaps =>
- withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enabled" ->
+ withSQLConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key ->
enableTwoLevelMaps) {
(1 to 3).foreach { fallbackStartsAt =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 8cdb8dd..d68a470 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf,
WithTestConf}
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION,
WAREHOUSE_PATH}
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with
config.
@@ -57,9 +57,9 @@ object TestHive
new SparkConf()
.set("spark.sql.test", "")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
- .set("spark.sql.hive.metastore.barrierPrefixes",
+ .set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
"org.apache.spark.sql.hive.execution.PairSerDe")
- .set("spark.sql.warehouse.dir",
TestHiveContext.makeWarehouseDir().toURI.getPath)
+ .set(WAREHOUSE_PATH.key,
TestHiveContext.makeWarehouseDir().toURI.getPath)
// SPARK-8910
.set(UI_ENABLED, false)
.set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
@@ -534,7 +534,7 @@ private[hive] class TestHiveSparkSession(
}
// Clean out the Hive warehouse between each suite
- val warehouseDir = new File(new
URI(sparkContext.conf.get("spark.sql.warehouse.dir")).getPath)
+ val warehouseDir = new File(new
URI(sparkContext.conf.get(WAREHOUSE_PATH.key)).getPath)
Utils.deleteRecursively(warehouseDir)
warehouseDir.mkdir()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]