This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6fc176f4f34d [SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig
interface
6fc176f4f34d is described below
commit 6fc176f4f34d73d6f6975836951562243343ba9a
Author: Herman van Hovell <[email protected]>
AuthorDate: Tue Sep 17 17:09:09 2024 -0400
[SPARK-49413][CONNECT][SQL] Create a shared RuntimeConfig interface
### What changes were proposed in this pull request?
This PR introduces a shared RuntimeConfig interface.
### Why are the changes needed?
We are creating a shared Scala Spark SQL interface for Classic and Connect.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47980 from hvanhovell/SPARK-49413.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 14 +--
.../ConnectRuntimeConfig.scala} | 70 +++-----------
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 +-
project/MimaExcludes.scala | 4 +
.../scala/org/apache/spark/sql/RuntimeConfig.scala | 105 +++++++++++++++++++++
.../org/apache/spark/sql/api/SparkSession.scala | 13 ++-
.../execution/ExecuteGrpcResponseSender.scala | 8 +-
.../spark/sql/connect/service/SessionHolder.scala | 4 +-
.../spark/sql/connect/utils/ErrorUtils.scala | 9 +-
.../service/SparkConnectSessionHolderSuite.scala | 2 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 4 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 19 ++--
.../spark/sql/artifact/ArtifactManager.scala | 2 +-
.../apache/spark/sql/execution/CacheManager.scala | 2 +-
.../apache/spark/sql/execution/SQLExecution.scala | 2 +-
.../apache/spark/sql/execution/command/ddl.scala | 2 +-
.../apache/spark/sql/execution/command/views.scala | 2 +-
.../sql/execution/datasources/DataSource.scala | 3 +-
.../datasources/binaryfile/BinaryFileFormat.scala | 2 +-
.../spark/sql/execution/datasources/ddl.scala | 2 +-
.../datasources/v2/state/StateDataSource.scala | 8 +-
.../sql/execution/streaming/AsyncLogPurge.scala | 3 +-
.../execution/streaming/MicroBatchExecution.scala | 2 +-
.../spark/sql/execution/streaming/OffsetSeq.scala | 23 +++--
.../sql/execution/streaming/StreamExecution.scala | 2 +-
.../sql/execution/streaming/WatermarkTracker.scala | 3 +-
.../RuntimeConfigImpl.scala} | 101 +++-----------------
.../sql/streaming/StreamingQueryManager.scala | 2 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 2 +-
.../scala/org/apache/spark/sql/GenTPCDSData.scala | 2 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 4 +-
.../org/apache/spark/sql/RuntimeConfigSuite.scala | 3 +-
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +-
.../spark/sql/SparkSessionBuilderSuite.scala | 20 ++--
.../spark/sql/SparkSessionExtensionSuite.scala | 6 +-
.../spark/sql/StatisticsCollectionTestBase.scala | 4 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 8 +-
.../execution/CoalesceShufflePartitionsSuite.scala | 4 +-
.../columnar/InMemoryColumnarQuerySuite.scala | 2 +-
.../columnar/PartitionBatchPruningSuite.scala | 4 +-
.../execution/datasources/ReadSchemaSuite.scala | 10 +-
.../binaryfile/BinaryFileFormatSuite.scala | 2 +-
.../sql/execution/datasources/orc/OrcTest.scala | 2 +-
.../datasources/parquet/ParquetIOSuite.scala | 4 +-
.../state/StateDataSourceChangeDataReadSuite.scala | 2 +-
.../v2/state/StateDataSourceReadSuite.scala | 4 +-
.../state/RocksDBStateStoreIntegrationSuite.scala | 2 +-
.../execution/streaming/state/RocksDBSuite.scala | 11 ++-
.../sql/expressions/ExpressionInfoSuite.scala | 2 +-
.../apache/spark/sql/internal/SQLConfSuite.scala | 88 ++++++++---------
.../spark/sql/sources/BucketedReadSuite.scala | 2 +-
.../spark/sql/streaming/FileStreamSinkSuite.scala | 2 +-
.../sql/streaming/FileStreamSourceSuite.scala | 4 +-
.../streaming/FlatMapGroupsWithStateSuite.scala | 2 +-
.../apache/spark/sql/streaming/StreamTest.scala | 5 +-
.../sql/streaming/TriggerAvailableNowSuite.scala | 2 +-
.../apache/spark/sql/test/SharedSparkSession.scala | 2 +
.../spark/sql/hive/HiveSharedStateSuite.scala | 2 +-
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
.../hive/execution/HiveSerDeReadWriteSuite.scala | 2 +-
.../spark/sql/hive/execution/SQLQuerySuite.scala | 4 +-
61 files changed, 317 insertions(+), 322 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 209ec88618c4..989a7e0c174c 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder,
CloseableIterator, Spar
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.functions.lit
-import org.apache.spark.sql.internal.{CatalogImpl, SessionCleaner, SqlApiConf}
+import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig,
SessionCleaner, SqlApiConf}
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr,
toTypedExpr}
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.streaming.StreamingQueryManager
@@ -88,16 +88,8 @@ class SparkSession private[sql] (
client.hijackServerSideSessionIdForTesting(suffix)
}
- /**
- * Runtime configuration interface for Spark.
- *
- * This is the interface through which the user can get and set all Spark
configurations that
- * are relevant to Spark SQL. When getting the value of a config, his
defaults to the value set
- * in server, if any.
- *
- * @since 3.4.0
- */
- val conf: RuntimeConfig = new RuntimeConfig(client)
+ /** @inheritdoc */
+ val conf: RuntimeConfig = new ConnectRuntimeConfig(client)
/** @inheritdoc */
@transient
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
similarity index 68%
rename from
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
rename to
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
index f77dd512ef25..7578e2424fb4 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala
@@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connect.client.SparkConnectClient
/**
@@ -25,61 +26,31 @@ import
org.apache.spark.sql.connect.client.SparkConnectClient
*
* @since 3.4.0
*/
-class RuntimeConfig private[sql] (client: SparkConnectClient) extends Logging {
+class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
+ extends RuntimeConfig
+ with Logging {
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def set(key: String, value: String): Unit = {
executeConfigRequest { builder =>
builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value)
}
}
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 3.4.0
- */
- def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value))
-
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 3.4.0
- */
- def set(key: String, value: Long): Unit = set(key, String.valueOf(value))
-
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @throws java.util.NoSuchElementException
- * if the key is not set and does not have a default value
- * @since 3.4.0
- */
+ /** @inheritdoc */
@throws[NoSuchElementException]("if the key is not set")
def get(key: String): String = getOption(key).getOrElse {
throw new NoSuchElementException(key)
}
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def get(key: String, default: String): String = {
executeConfigRequestSingleValue { builder =>
builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
}
}
- /**
- * Returns all properties set in this conf.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def getAll: Map[String, String] = {
val response = executeConfigRequest { builder =>
builder.getGetAllBuilder
@@ -92,11 +63,7 @@ class RuntimeConfig private[sql] (client:
SparkConnectClient) extends Logging {
builder.result()
}
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def getOption(key: String): Option[String] = {
val pair = executeConfigRequestSinglePair { builder =>
builder.getGetOptionBuilder.addKeys(key)
@@ -108,27 +75,14 @@ class RuntimeConfig private[sql] (client:
SparkConnectClient) extends Logging {
}
}
- /**
- * Resets the configuration property for the given key.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def unset(key: String): Unit = {
executeConfigRequest { builder =>
builder.getUnsetBuilder.addKeys(key)
}
}
- /**
- * Indicates whether the configuration property with the given key is
modifiable in the current
- * session.
- *
- * @return
- * `true` if the configuration property is modifiable. For static SQL,
Spark Core, invalid
- * (not existing) and other non-modifiable configuration properties, the
returned value is
- * `false`.
- * @since 3.4.0
- */
+ /** @inheritdoc */
def isModifiable(key: String): Boolean = {
val modifiable = executeConfigRequestSingleValue { builder =>
builder.getIsModifiableBuilder.addKeys(key)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 9ae6a9290f80..1d119de43970 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1156,7 +1156,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase with
test("allow group.id prefix") {
// Group ID prefix is only supported by consumer based offset reader
- if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
+ if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
testGroupId("groupIdPrefix", (expected, actual) => {
assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ ===
expected),
"Valid consumer groups don't contain the expected group id - " +
@@ -1167,7 +1167,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase with
test("allow group.id override") {
// Group ID override is only supported by consumer based offset reader
- if (spark.conf.get(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
+ if (sqlConf.getConf(SQLConf.USE_DEPRECATED_KAFKA_OFFSET_FETCHING)) {
testGroupId("kafka.group.id", (expected, actual) => {
assert(actual.exists(_ === expected), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $actual / "
+
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6eee1e759e5e..68433b501bcc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -160,6 +160,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameWriterV2"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WriteConfigMethods"),
+ // SPARK-49413: Create a shared RuntimeConfig interface.
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"),
+
// SPARK-49287: Shared Streaming interfaces
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.SparkListenerEvent"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"),
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
new file mode 100644
index 000000000000..23a2774ebc3a
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Stable
+
+/**
+ * Runtime configuration interface for Spark. To access this, use
`SparkSession.conf`.
+ *
+ * Options set here are automatically propagated to the Hadoop configuration
during I/O.
+ *
+ * @since 2.0.0
+ */
+@Stable
+abstract class RuntimeConfig {
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 2.0.0
+ */
+ def set(key: String, value: String): Unit
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 2.0.0
+ */
+ def set(key: String, value: Boolean): Unit = {
+ set(key, value.toString)
+ }
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 2.0.0
+ */
+ def set(key: String, value: Long): Unit = {
+ set(key, value.toString)
+ }
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given
key.
+ *
+ * @throws java.util.NoSuchElementException
+ * if the key is not set and does not have a default value
+ * @since 2.0.0
+ */
+ @throws[NoSuchElementException]("if the key is not set")
+ def get(key: String): String
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given
key.
+ *
+ * @since 2.0.0
+ */
+ def get(key: String, default: String): String
+
+ /**
+ * Returns all properties set in this conf.
+ *
+ * @since 2.0.0
+ */
+ def getAll: Map[String, String]
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given
key.
+ *
+ * @since 2.0.0
+ */
+ def getOption(key: String): Option[String]
+
+ /**
+ * Resets the configuration property for the given key.
+ *
+ * @since 2.0.0
+ */
+ def unset(key: String): Unit
+
+ /**
+ * Indicates whether the configuration property with the given key is
modifiable in the current
+ * session.
+ *
+ * @return
+ * `true` if the configuration property is modifiable. For static SQL,
Spark Core, invalid
+ * (not existing) and other non-modifiable configuration properties, the
returned value is
+ * `false`.
+ * @since 2.4.0
+ */
+ def isModifiable(key: String): Boolean
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index cf502c746d24..0580931620aa 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -26,7 +26,7 @@ import _root_.java.net.URI
import _root_.java.util
import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
import org.apache.spark.sql.types.StructType
/**
@@ -58,6 +58,17 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends
Serializable with C
*/
def version: String
+ /**
+ * Runtime configuration interface for Spark.
+ *
+ * This is the interface through which the user can get and set all Spark
and Hadoop
+ * configurations that are relevant to Spark SQL. When getting the value of
a config, this
+ * defaults to the value set in the underlying `SparkContext`, if any.
+ *
+ * @since 2.0.0
+ */
+ def conf: RuntimeConfig
+
/**
* A collection of methods for registering user-defined functions (UDF).
*
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 3e360372d560..051093fcad27 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -142,7 +142,9 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
* client, but rather enqueued to in the response observer.
*/
private def enqueueProgressMessage(force: Boolean = false): Unit = {
- if
(executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
> 0) {
+ val progressReportInterval =
executeHolder.sessionHolder.session.sessionState.conf
+ .getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
+ if (progressReportInterval > 0) {
SparkConnectService.executionListener.foreach { listener =>
// It is possible, that the tracker is no longer available and in this
// case we simply ignore it and do not send any progress message. This
avoids
@@ -240,8 +242,8 @@ private[connect] class ExecuteGrpcResponseSender[T <:
Message](
// monitor, and will notify upon state change.
if (response.isEmpty) {
// Wake up more frequently to send the progress updates.
- val progressTimeout =
-
executeHolder.sessionHolder.session.conf.get(CONNECT_PROGRESS_REPORT_INTERVAL)
+ val progressTimeout =
executeHolder.sessionHolder.session.sessionState.conf
+ .getConf(CONNECT_PROGRESS_REPORT_INTERVAL)
// If the progress feature is disabled, wait for the deadline.
val timeout = if (progressTimeout > 0) {
progressTimeout
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 0cb820b39e87..e56d66da3050 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -444,8 +444,8 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
*/
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
transform: proto.Relation => LogicalPlan): LogicalPlan = {
- val planCacheEnabled =
-
Option(session).forall(_.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED,
true))
+ val planCacheEnabled = Option(session)
+
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED,
true))
// We only cache plans that have a plan ID.
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 355048cf3036..f1636ed1ef09 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -205,7 +205,9 @@ private[connect] object ErrorUtils extends Logging {
case _ =>
}
- if
(sessionHolderOpt.exists(_.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)))
{
+ val enrichErrorEnabled = sessionHolderOpt.exists(
+
_.session.sessionState.conf.getConf(Connect.CONNECT_ENRICH_ERROR_ENABLED))
+ if (enrichErrorEnabled) {
// Generate a new unique key for this exception.
val errorId = UUID.randomUUID().toString
@@ -216,9 +218,10 @@ private[connect] object ErrorUtils extends Logging {
}
lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
+ val stackTraceEnabled = sessionHolderOpt.exists(
+
_.session.sessionState.conf.getConf(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED))
val withStackTrace =
- if (sessionHolderOpt.exists(
- _.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) &&
stackTrace.nonEmpty)) {
+ if (stackTraceEnabled && stackTrace.nonEmpty) {
val maxSize = Math.min(
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
maxMetadataSize)
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
index beebe5d2e2dc..ed2f60afb009 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
@@ -399,7 +399,7 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
test("Test session plan cache - disabled") {
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
// Disable plan cache of the session
- sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED,
false)
+
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key,
false)
val planner = new SparkConnectPlanner(sessionHolder)
val query = buildRelation("select 1")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0fab60a94842..6e5dcc24e29d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -243,7 +243,7 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
val plan = queryExecution.commandExecuted
- if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
+ if
(sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED))
{
val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new
HashSet[Long])
dsIds.add(id)
plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
@@ -772,7 +772,7 @@ class Dataset[T] private[sql](
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
case a: AttributeReference
- if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
+ if
sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)
=>
val metadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(Dataset.DATASET_ID_KEY, id)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a7fb71d95d14..5746b942341f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -173,16 +173,8 @@ class SparkSession private(
@transient
val sqlContext: SQLContext = new SQLContext(this)
- /**
- * Runtime configuration interface for Spark.
- *
- * This is the interface through which the user can get and set all Spark
and Hadoop
- * configurations that are relevant to Spark SQL. When getting the value of
a config,
- * this defaults to the value set in the underlying `SparkContext`, if any.
- *
- * @since 2.0.0
- */
- @transient lazy val conf: RuntimeConfig = new
RuntimeConfig(sessionState.conf)
+ /** @inheritdoc */
+ @transient lazy val conf: RuntimeConfig = new
RuntimeConfigImpl(sessionState.conf)
/**
* An interface to register custom
[[org.apache.spark.sql.util.QueryExecutionListener]]s
@@ -745,7 +737,8 @@ class SparkSession private(
}
private[sql] def leafNodeDefaultParallelism: Int = {
-
conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse(sparkContext.defaultParallelism)
+ sessionState.conf.getConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM)
+ .getOrElse(sparkContext.defaultParallelism)
}
private[sql] object Converter extends ColumnNodeToExpressionConverter with
Serializable {
@@ -1110,13 +1103,13 @@ object SparkSession extends Logging {
private[sql] def getOrCloneSessionWithConfigsOff(
session: SparkSession,
configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
- val configsEnabled = configurations.filter(session.conf.get[Boolean])
+ val configsEnabled =
configurations.filter(session.sessionState.conf.getConf[Boolean])
if (configsEnabled.isEmpty) {
session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
- newSession.conf.set(conf, false)
+ newSession.sessionState.conf.setConf(conf, false)
})
newSession
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 4eb7d4fa17ee..1ee960622fc2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -324,7 +324,7 @@ class ArtifactManager(session: SparkSession) extends
Logging {
val fs = destFSPath.getFileSystem(hadoopConf)
if (fs.isInstanceOf[LocalFileSystem]) {
val allowDestLocalConf =
-
session.conf.get(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL)
+
session.sessionState.conf.getConf(SQLConf.ARTIFACT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL)
.getOrElse(
session.conf.get("spark.connect.copyFromLocalToFs.allowDestLocal").contains("true"))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index aae424afcb8a..1bf6f4e4d7d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -474,7 +474,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
// Bucketed scan only has one time overhead but can have multi-times
benefits in cache,
// so we always do bucketed scan in a cached plan.
var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
- if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING))
{
+ if
(!session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING))
{
// Allowing changing cached plan output partitioning might lead to
regression as it introduces
// extra shuffle
disableConfigs =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 58fff2d4a1a2..12ff649b621e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -87,7 +87,7 @@ object SQLExecution extends Logging {
executionIdToQueryExecution.put(executionId, queryExecution)
val originalInterruptOnCancel =
sc.getLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL)
if (originalInterruptOnCancel == null) {
- val interruptOnCancel =
sparkSession.conf.get(SQLConf.INTERRUPT_ON_CANCEL)
+ val interruptOnCancel =
sparkSession.sessionState.conf.getConf(SQLConf.INTERRUPT_ON_CANCEL)
sc.setInterruptOnCancel(interruptOnCancel)
}
try {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 3f221bfa5305..814e56b204f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -861,7 +861,7 @@ case class RepairTableCommand(
// Hive metastore may not have enough memory to handle millions of
partitions in single RPC,
// we should split them into smaller batches. Since Hive client is not
thread safe, we cannot
// do this in parallel.
- val batchSize = spark.conf.get(SQLConf.ADD_PARTITION_BATCH_SIZE)
+ val batchSize =
spark.sessionState.conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index e1061a46db7b..071e3826b20a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -135,7 +135,7 @@ case class CreateViewCommand(
referredTempFunctions)
catalog.createTempView(name.table, tableDefinition, overrideIfExists =
replace)
} else if (viewType == GlobalTempView) {
- val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val db =
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val tableDefinition = createTemporaryViewRelation(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d88b5ee8877d..968c204841e4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -267,7 +267,8 @@ case class DataSource(
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false,
checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
})
- val forceNullable =
sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
+ val forceNullable = sparkSession.sessionState.conf
+ .getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
val sourceDataSchema = if (forceNullable) dataSchema.asNullable else
dataSchema
SourceInfo(
s"FileSource[$path]",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index cbff526592f9..54c100282e2d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -98,7 +98,7 @@ class BinaryFileFormat extends FileFormat with
DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val filterFuncs = filters.flatMap(filter => createFilterFunction(filter))
- val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
+ val maxLength =
sparkSession.sessionState.conf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH)
file: PartitionedFile => {
val path = file.toPath
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fc6cba786c4e..d9367d92d462 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -115,7 +115,7 @@ case class CreateTempViewUsing(
}.logicalPlan
if (global) {
- val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val db =
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(tableIdent.table, Option(db))
val viewDefinition = createTemporaryViewRelation(
viewIdent,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 83399e2cac01..50b90641d309 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
@@ -119,9 +119,9 @@ class StateDataSource extends TableProvider with
DataSourceRegister with Logging
throw StateDataSourceErrors.offsetMetadataLogUnavailable(batchId,
checkpointLocation)
)
- val clonedRuntimeConf = new
RuntimeConfig(session.sessionState.conf.clone())
- OffsetSeqMetadata.setSessionConf(metadata, clonedRuntimeConf)
- StateStoreConf(clonedRuntimeConf.sqlConf)
+ val clonedSqlConf = session.sessionState.conf.clone()
+ OffsetSeqMetadata.setSessionConf(metadata, clonedSqlConf)
+ StateStoreConf(clonedSqlConf)
case _ =>
throw StateDataSourceErrors.offsetLogUnavailable(batchId,
checkpointLocation)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
index 06fdc6c53bc4..cb7e71bda84d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala
@@ -49,7 +49,8 @@ trait AsyncLogPurge extends Logging {
// which are written per run.
protected def purgeStatefulMetadata(plan: SparkPlan): Unit
- protected lazy val useAsyncPurge: Boolean =
sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE)
+ protected lazy val useAsyncPurge: Boolean = sparkSession.sessionState.conf
+ .getConf(SQLConf.ASYNC_LOG_PURGE)
protected def purgeAsync(batchId: Long): Unit = {
if (purgeRunning.compareAndSet(false, true)) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 285494543533..053aef6ced3a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -477,7 +477,7 @@ class MicroBatchExecution(
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
- OffsetSeqMetadata.setSessionConf(metadata,
sparkSessionToRunBatches.conf)
+ OffsetSeqMetadata.setSessionConf(metadata,
sparkSessionToRunBatches.sessionState.conf)
execCtx.offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs,
sparkSessionToRunBatches.conf)
watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index f0be33ad9a9d..d5facc245e72 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -26,6 +26,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2,
SparkDataStream}
import
org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper,
StreamingAggregationStateManager, SymmetricHashJoinStateManager}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
@@ -135,20 +136,21 @@ object OffsetSeqMetadata extends Logging {
}
/** Set the SparkSession configuration with the values in the metadata */
- def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig):
Unit = {
+ def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: SQLConf): Unit
= {
+ val configs = sessionConf.getAllConfs
OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
metadata.conf.get(confKey) match {
case Some(valueInMetadata) =>
// Config value exists in the metadata, update the session config
with this value
- val optionalValueInSession = sessionConf.getOption(confKey)
- if (optionalValueInSession.isDefined && optionalValueInSession.get
!= valueInMetadata) {
+ val optionalValueInSession = sessionConf.getConfString(confKey, null)
+ if (optionalValueInSession != null && optionalValueInSession !=
valueInMetadata) {
logWarning(log"Updating the value of conf '${MDC(CONFIG,
confKey)}' in current " +
- log"session from '${MDC(OLD_VALUE, optionalValueInSession.get)}'
" +
+ log"session from '${MDC(OLD_VALUE, optionalValueInSession)}' " +
log"to '${MDC(NEW_VALUE, valueInMetadata)}'.")
}
- sessionConf.set(confKey, valueInMetadata)
+ sessionConf.setConfString(confKey, valueInMetadata)
case None =>
// For backward compatibility, if a config was not recorded in the
offset log,
@@ -157,14 +159,17 @@ object OffsetSeqMetadata extends Logging {
relevantSQLConfDefaultValues.get(confKey) match {
case Some(defaultValue) =>
- sessionConf.set(confKey, defaultValue)
+ sessionConf.setConfString(confKey, defaultValue)
logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in
the offset log, " +
log"using default value '${MDC(DEFAULT_VALUE, defaultValue)}'")
case None =>
- val valueStr = sessionConf.getOption(confKey).map { v =>
- s" Using existing session conf value '$v'."
- }.getOrElse { " No value set in session conf." }
+ val value = sessionConf.getConfString(confKey, null)
+ val valueStr = if (value != null) {
+ s" Using existing session conf value '$value'."
+ } else {
+ " No value set in session conf."
+ }
logWarning(log"Conf '${MDC(CONFIG, confKey)}' was not found in
the offset log. " +
log"${MDC(TIP, valueStr)}")
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4b1b9e02a242..8f030884ad33 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -483,7 +483,7 @@ abstract class StreamExecution(
@throws[TimeoutException]
protected def interruptAndAwaitExecutionThreadTermination(): Unit = {
val timeout = math.max(
- sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0)
+ sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT),
0)
queryExecutionThread.interrupt()
queryExecutionThread.join(timeout)
if (queryExecutionThread.isAlive) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
index 54c47ec4e6ed..3e6f122f463d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -135,7 +135,8 @@ object WatermarkTracker {
// saved in the checkpoint (e.g., old checkpoints), then the default `min`
policy is enforced
// through defaults specified in OffsetSeqMetadata.setSessionConf().
val policyName = conf.get(
- SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY,
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME)
+ SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key,
+ MultipleWatermarkPolicy.DEFAULT_POLICY_NAME)
new WatermarkTracker(MultipleWatermarkPolicy(policyName))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
similarity index 51%
rename from sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
rename to
sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
index ed8cf4f121f0..ca439cdb8995 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.internal
import scala.jdk.CollectionConverters._
import org.apache.spark.SPARK_DOC_ROOT
import org.apache.spark.annotation.Stable
-import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
+import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
/**
* Runtime configuration interface for Spark. To access this, use
`SparkSession.conf`.
@@ -33,89 +33,26 @@ import org.apache.spark.sql.internal.SQLConf
* @since 2.0.0
*/
@Stable
-class RuntimeConfig private[sql](val sqlConf: SQLConf = new SQLConf) {
+class RuntimeConfigImpl private[sql](val sqlConf: SQLConf = new SQLConf)
extends RuntimeConfig {
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def set(key: String, value: String): Unit = {
requireNonStaticConf(key)
sqlConf.setConfString(key, value)
}
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 2.0.0
- */
- def set(key: String, value: Boolean): Unit = {
- set(key, value.toString)
- }
-
- /**
- * Sets the given Spark runtime configuration property.
- *
- * @since 2.0.0
- */
- def set(key: String, value: Long): Unit = {
- set(key, value.toString)
- }
-
- /**
- * Sets the given Spark runtime configuration property.
- */
- private[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = {
- requireNonStaticConf(entry.key)
- sqlConf.setConf(entry, value)
- }
-
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @throws java.util.NoSuchElementException if the key is not set and does
not have a default
- * value
- * @since 2.0.0
- */
+ /** @inheritdoc */
@throws[NoSuchElementException]("if the key is not set")
def get(key: String): String = {
sqlConf.getConfString(key)
}
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def get(key: String, default: String): String = {
sqlConf.getConfString(key, default)
}
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- */
- @throws[NoSuchElementException]("if the key is not set")
- private[sql] def get[T](entry: ConfigEntry[T]): T = {
- sqlConf.getConf(entry)
- }
-
- private[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
- sqlConf.getConf(entry)
- }
-
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- */
- private[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
- sqlConf.getConf(entry, default)
- }
-
- /**
- * Returns all properties set in this conf.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def getAll: Map[String, String] = {
sqlConf.getAllConfs
}
@@ -124,36 +61,20 @@ class RuntimeConfig private[sql](val sqlConf: SQLConf =
new SQLConf) {
getAll.asJava
}
- /**
- * Returns the value of Spark runtime configuration property for the given
key.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def getOption(key: String): Option[String] = {
try Option(get(key)) catch {
case _: NoSuchElementException => None
}
}
- /**
- * Resets the configuration property for the given key.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def unset(key: String): Unit = {
requireNonStaticConf(key)
sqlConf.unsetConf(key)
}
- /**
- * Indicates whether the configuration property with the given key
- * is modifiable in the current session.
- *
- * @return `true` if the configuration property is modifiable. For static
SQL, Spark Core,
- * invalid (not existing) and other non-modifiable configuration
properties,
- * the returned value is `false`.
- * @since 2.4.0
- */
+ /** @inheritdoc */
def isModifiable(key: String): Boolean = sqlConf.isModifiable(key)
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 55d2e639a56b..3ab6d02f6b51 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -364,7 +364,7 @@ class StreamingQueryManager private[sql] (
.orElse(activeQueries.get(query.id)) // shouldn't be needed but
paranoia ...
val shouldStopActiveRun =
- sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
if (activeOption.isDefined) {
if (shouldStopActiveRun) {
val oldQuery = activeOption.get
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 2fe6a83427bc..e44bd5de4f4c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -52,7 +52,7 @@ class FileBasedDataSourceSuite extends QueryTest
override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
index 48a16f01d574..6cd8ade41da1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GenTPCDSData.scala
@@ -225,7 +225,7 @@ class TPCDSTables(spark: SparkSession, dsdgenDir: String,
scaleFactor: Int)
// datagen speed files will be truncated to maxRecordsPerFile value,
so the final
// result will be the same.
val numRows = data.count()
- val maxRecordPerFile = spark.conf.get(SQLConf.MAX_RECORDS_PER_FILE)
+ val maxRecordPerFile =
spark.sessionState.conf.getConf(SQLConf.MAX_RECORDS_PER_FILE)
if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index fcb937d82ba4..0f5582def82d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -597,10 +597,10 @@ class JoinSuite extends QueryTest with SharedSparkSession
with AdaptiveSparkPlan
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
assert(statisticSizeInByte(spark.table("testData2")) >
- spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+ sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
assert(statisticSizeInByte(spark.table("testData")) <
- spark.conf.get[Long](SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+ sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
index 405213072081..352197f96acb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
@@ -19,12 +19,13 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
+import org.apache.spark.sql.internal.RuntimeConfigImpl
import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
class RuntimeConfigSuite extends SparkFunSuite {
- private def newConf(): RuntimeConfig = new RuntimeConfig
+ private def newConf(): RuntimeConfig = new RuntimeConfigImpl()
test("set and get") {
val conf = newConf()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9beceda26379..ce88f7dc475d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2568,20 +2568,21 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
val newSession = spark.newSession()
+ val newSqlConf = newSession.sessionState.conf
val originalValue = newSession.sessionState.conf.runSQLonFile
try {
- newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false)
+ newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
intercept[AnalysisException] {
newSession.sql(s"SELECT i, j FROM
parquet.`${path.getCanonicalPath}`")
}
- newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true)
+ newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
checkAnswer(
newSession.sql(s"SELECT i, j FROM
parquet.`${path.getCanonicalPath}`"),
Row(1, "a"))
} finally {
- newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue)
+ newSqlConf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 4ac05373e5a3..d3117ec411fe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -201,10 +201,10 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
.getOrCreate()
assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
- assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31234")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31234")
session.sql("RESET")
assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
- assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31234")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31234")
}
test("SPARK-31354: SparkContext only register one SparkSession
ApplicationEnd listener") {
@@ -244,8 +244,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
.builder()
.config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
.getOrCreate()
- assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
- assert(session1.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31532")
+ assert(session1.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31532")
// do not propagate static sql configs to the existing default session
SparkSession.clearActiveSession()
@@ -255,9 +255,9 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
.config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
.getOrCreate()
- assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
- assert(session.conf.get(WAREHOUSE_PATH) ===
session2.conf.get(WAREHOUSE_PATH))
- assert(session2.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532")
+ assert(!session.conf.get(WAREHOUSE_PATH.key).contains("SPARK-31532-db"))
+ assert(session.conf.get(WAREHOUSE_PATH.key) ===
session2.conf.get(WAREHOUSE_PATH.key))
+ assert(session2.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31532")
}
test("SPARK-31532: propagate static sql configs if no existing
SparkSession") {
@@ -275,8 +275,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
.config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
.getOrCreate()
assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
- assert(session.conf.get(GLOBAL_TEMP_DATABASE) ===
"globaltempdb-spark-31532-2")
- assert(session.conf.get(WAREHOUSE_PATH) contains "SPARK-31532-db-2")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE.key) ===
"globalTempDB-SPARK-31532-2")
+ assert(session.conf.get(WAREHOUSE_PATH.key) contains "SPARK-31532-db-2")
}
test("SPARK-32062: reset listenerRegistered in SparkSession") {
@@ -461,7 +461,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with
Eventually {
val expected =
path.getFileSystem(hadoopConf).makeQualified(path).toString
// session related configs
assert(hadoopConf.get("hive.metastore.warehouse.dir") === expected)
- assert(session.conf.get(WAREHOUSE_PATH) === expected)
+ assert(session.conf.get(WAREHOUSE_PATH.key) === expected)
assert(session.sessionState.conf.warehousePath === expected)
// shared configs
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 322210bf5b59..ba87028a7147 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -178,7 +178,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))
}
withSession(extensions) { session =>
- session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true)
assert(session.sessionState.adaptiveRulesHolder.queryStagePrepRules
.contains(MyQueryStagePrepRule()))
assert(session.sessionState.columnarRules.contains(
@@ -221,7 +221,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
withSession(extensions) { session =>
- session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, true)
session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
@@ -280,7 +280,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
withSession(extensions) { session =>
- session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, enableAQE)
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.implicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index ef8b66566f24..7fa29dd38fd9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -366,7 +366,7 @@ abstract class StatisticsCollectionTestBase extends
QueryTest with SQLTestUtils
val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
assert(stats.sizeInBytes > 0, "non-empty partitioned table should not
report zero size.")
- if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+ if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") {
sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
@@ -381,7 +381,7 @@ abstract class StatisticsCollectionTestBase extends
QueryTest with SQLTestUtils
// Test data source table
checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
// Test hive serde table
- if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+ if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION.key) == "hive") {
checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5df7b62cfb28..7aaec6d500ba 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2481,7 +2481,7 @@ class DataSourceV2SQLSuiteV1Filter
}
test("global temp view should not be masked by v2 catalog") {
- val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
try {
@@ -2495,7 +2495,7 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-30104: global temp db is used as a table name under v2 catalog")
{
- val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
val t = s"testcat.$globalTempDB"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
@@ -2506,7 +2506,7 @@ class DataSourceV2SQLSuiteV1Filter
}
test("SPARK-30104: v2 catalog named global_temp will be masked") {
- val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
checkError(
exception = intercept[AnalysisException] {
@@ -2712,7 +2712,7 @@ class DataSourceV2SQLSuiteV1Filter
parameters = Map("relationName" -> "`testcat`.`abc`"),
context = ExpectedContext(fragment = "testcat.abc", start = 17, stop =
27))
- val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE.key)
registerCatalog(globalTempDB, classOf[InMemoryTableCatalog])
withTempView("v") {
sql("create global temp view v as select 1")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index dc72b4a092ae..9ed4f1a006b2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -317,7 +317,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite
with SQLConfHelper {
import spark.implicits._
spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1KB")
spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "10KB")
- spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0")
val df00 = spark.range(0, 1000, 2)
.selectExpr("id as key", "id as value")
.union(Seq.fill(100000)((600, 600)).toDF("key", "value"))
@@ -345,7 +345,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite
with SQLConfHelper {
import spark.implicits._
spark.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key, "100B")
- spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR, 2.0)
+ spark.conf.set(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key, "2.0")
val df00 = spark.range(0, 10, 2)
.selectExpr("id as key", "id as value")
.union(Seq.fill(1000)((600, 600)).toDF("key", "value"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index ad755bf22ab0..0ba55382cd9a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -150,7 +150,7 @@ class InMemoryColumnarQuerySuite extends QueryTest
spark.catalog.cacheTable("sizeTst")
assert(
spark.table("sizeTst").queryExecution.analyzed.stats.sizeInBytes >
- spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
+ sqlConf.getConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 885286843a14..88ff51d0ff4c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -27,9 +27,9 @@ class PartitionBatchPruningSuite extends SharedSparkSession
with AdaptiveSparkPl
import testImplicits._
- private lazy val originalColumnBatchSize =
spark.conf.get(SQLConf.COLUMN_BATCH_SIZE)
+ private lazy val originalColumnBatchSize =
spark.conf.get(SQLConf.COLUMN_BATCH_SIZE.key)
private lazy val originalInMemoryPartitionPruning =
- spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING)
+ spark.conf.get(SQLConf.IN_MEMORY_PARTITION_PRUNING.key)
private val testArrayData = (1 to 100).map { key =>
Tuple1(Array.fill(key)(key))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
index fefb16a351fd..c798196c4f0e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
@@ -101,7 +101,7 @@ class OrcReadSchemaSuite
override def beforeAll(): Unit = {
super.beforeAll()
- originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED)
+ originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED)
spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "false")
}
@@ -126,7 +126,7 @@ class VectorizedOrcReadSchemaSuite
override def beforeAll(): Unit = {
super.beforeAll()
- originalConf = spark.conf.get(SQLConf.ORC_VECTORIZED_READER_ENABLED)
+ originalConf = sqlConf.getConf(SQLConf.ORC_VECTORIZED_READER_ENABLED)
spark.conf.set(SQLConf.ORC_VECTORIZED_READER_ENABLED.key, "true")
}
@@ -169,7 +169,7 @@ class ParquetReadSchemaSuite
override def beforeAll(): Unit = {
super.beforeAll()
- originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+ originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false")
}
@@ -193,7 +193,7 @@ class VectorizedParquetReadSchemaSuite
override def beforeAll(): Unit = {
super.beforeAll()
- originalConf = spark.conf.get(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+ originalConf = sqlConf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
}
@@ -217,7 +217,7 @@ class MergedParquetReadSchemaSuite
override def beforeAll(): Unit = {
super.beforeAll()
- originalConf = spark.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)
+ originalConf = sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)
spark.conf.set(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key, "true")
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 3dec1b9ff5cf..deb62eb3ac23 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -346,7 +346,7 @@ class BinaryFileFormatSuite extends QueryTest with
SharedSparkSession {
}
test("fail fast and do not attempt to read if a file is too big") {
- assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
+ assert(sqlConf.getConf(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue)
withTempPath { file =>
val path = file.getPath
val content = "123".getBytes
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 48b4f8d4bc01..b8669ee4d1ef 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -63,7 +63,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest
with BeforeAndAfter
protected override def beforeAll(): Unit = {
super.beforeAll()
- originalConfORCImplementation = spark.conf.get(ORC_IMPLEMENTATION)
+ originalConfORCImplementation =
spark.sessionState.conf.getConf(ORC_IMPLEMENTATION)
spark.conf.set(ORC_IMPLEMENTATION.key, orcImp)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 6c2f5a2d134d..0afa545595c7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -846,7 +846,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
-
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT))
{
+
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key).toUpperCase(Locale.ROOT))
{
compressionCodecFor(path, codec.name())
}
}
@@ -855,7 +855,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
// Checks default compression codec
checkCompressionCodec(
-
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
+
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION.key)))
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
index 2858d356d4c9..4833b8630134 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala
@@ -58,7 +58,7 @@ abstract class StateDataSourceChangeDataReaderSuite extends
StateDataSourceTestB
override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED, false)
+ spark.conf.set(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key, false)
spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
newStateStoreProvider().getClass.getName)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 97c88037a717..af0770756950 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -942,7 +942,7 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
// skip version and operator ID to test out functionalities
.load()
- val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
val resultDf = stateReadDf
.selectExpr("key.value AS key_value", "value.count AS value_count",
"partition_id")
@@ -966,7 +966,7 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
test("partition_id column with stream-stream join") {
- val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ val numShufflePartitions = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
withTempDir { tempDir =>
runStreamStreamJoinQueryWithOneThousandInputs(tempDir.getAbsolutePath)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 8fcd6edf1abb..d20cfb04f8e8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -119,7 +119,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
private def getFormatVersion(query: StreamingQuery): Int = {
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession
- .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
+ .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
}
testWithColumnFamilies("SPARK-36519: store RocksDB format version in the
checkpoint",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 7ac574db98d4..691f18451af2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -34,7 +34,7 @@ import org.rocksdb.CompressionType
import org.scalactic.source.Position
import org.scalatest.Tag
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.{CreateAtomicTestManager,
FileSystemBasedCheckpointFileManager}
import
org.apache.spark.sql.execution.streaming.CheckpointFileManager.{CancellableFSDataOutputStream,
RenameBasedFSDataOutputStream}
@@ -167,7 +167,10 @@ trait AlsoTestWithChangelogCheckpointingEnabled
@SlowSQLTest
class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with
SharedSparkSession {
- sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(SQLConf.STATE_STORE_PROVIDER_CLASS,
classOf[RocksDBStateStoreProvider].getName)
+ }
testWithColumnFamilies(
"RocksDB: check changelog and snapshot version",
@@ -2157,9 +2160,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
- private def sqlConf = SQLConf.get.clone()
-
- private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
def withDB[T](
remoteDir: String,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 898aeec22ad1..6eff610433c9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -243,7 +243,7 @@ class ExpressionInfoSuite extends SparkFunSuite with
SharedSparkSession {
// Examples can change settings. We clone the session to prevent tests
clashing.
val clonedSpark = spark.cloneSession()
// Coalescing partitions can change result order, so disable it.
- clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
+ clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, false)
val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)
val className = info.getClassName
if (!ignoreSet.contains(className)) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index d0d4dc6b344f..82795e551b6b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -47,7 +47,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
// Set a conf first.
spark.conf.set(testKey, testVal)
// Clear the conf.
- spark.sessionState.conf.clear()
+ sqlConf.clear()
// After clear, only overrideConfs used by unit test should be in the
SQLConf.
assert(spark.conf.getAll === TestSQLContext.overrideConfs)
@@ -62,11 +62,11 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
assert(spark.conf.get(testKey, testVal + "_") === testVal)
assert(spark.conf.getAll.contains(testKey))
- spark.sessionState.conf.clear()
+ sqlConf.clear()
}
test("parse SQL set commands") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
sql(s"set $testKey=$testVal")
assert(spark.conf.get(testKey, testVal + "_") === testVal)
assert(spark.conf.get(testKey, testVal + "_") === testVal)
@@ -84,11 +84,11 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
sql(s"set $key=")
assert(spark.conf.get(key, "0") === "")
- spark.sessionState.conf.clear()
+ sqlConf.clear()
}
test("set command for display") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
checkAnswer(
sql("SET").where("key = 'spark.sql.groupByOrdinal'").select("key",
"value"),
Nil)
@@ -109,11 +109,11 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("deprecated property") {
- spark.sessionState.conf.clear()
- val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ sqlConf.clear()
+ val original = sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS)
try {
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
- assert(spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) === 10)
+ assert(sqlConf.getConf(SQLConf.SHUFFLE_PARTITIONS) === 10)
} finally {
sql(s"set ${SQLConf.SHUFFLE_PARTITIONS.key}=$original")
}
@@ -146,18 +146,18 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("reset - public conf") {
- spark.sessionState.conf.clear()
- val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+ sqlConf.clear()
+ val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)
try {
- assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
+ assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL))
sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
- assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
+ assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === false)
assert(sql(s"set").where(s"key =
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
- assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"reset")
- assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
+ assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL))
assert(sql(s"set").where(s"key =
'${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
- assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
} finally {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=$original")
@@ -165,15 +165,15 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("reset - internal conf") {
- spark.sessionState.conf.clear()
- val original = spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS)
+ sqlConf.clear()
+ val original = sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS)
try {
- assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=10")
- assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 10)
assert(sql(s"set").where(s"key =
'${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 1)
sql(s"reset")
- assert(spark.conf.get(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_MAX_ITERATIONS) === 100)
assert(sql(s"set").where(s"key =
'${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}'").count() == 0)
} finally {
sql(s"set ${SQLConf.OPTIMIZER_MAX_ITERATIONS.key}=$original")
@@ -181,7 +181,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("reset - user-defined conf") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
val userDefinedConf = "x.y.z.reset"
try {
assert(spark.conf.getOption(userDefinedConf).isEmpty)
@@ -196,7 +196,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-32406: reset - single configuration") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
// spark core conf w/o entry registered
val appId = spark.sparkContext.getConf.getAppId
sql("RESET spark.app.id")
@@ -216,19 +216,19 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
sql("RESET spark.abc") // ignore nonexistent keys
// runtime sql configs
- val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
+ val original = sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL)
sql(s"SET ${SQLConf.GROUP_BY_ORDINAL.key}=false")
sql(s"RESET ${SQLConf.GROUP_BY_ORDINAL.key}")
- assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === original)
+ assert(sqlConf.getConf(SQLConf.GROUP_BY_ORDINAL) === original)
// runtime sql configs with optional defaults
- assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"RESET ${SQLConf.OPTIMIZER_EXCLUDED_RULES.key}")
- assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
+ assert(sqlConf.getConf(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
sql(s"SET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}=abc")
sql(s"RESET ${SQLConf.PLAN_CHANGE_LOG_RULES.key}")
- assert(spark.conf.get(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty)
+ assert(sqlConf.getConf(SQLConf.PLAN_CHANGE_LOG_RULES).isEmpty)
// static sql configs
checkError(
@@ -247,19 +247,19 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("Test ADVISORY_PARTITION_SIZE_IN_BYTES's method") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "100")
- assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
+ assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 100)
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1k")
- assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
+ assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1024)
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1M")
- assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1048576)
+ assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1048576)
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
- assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1073741824)
+ assert(sqlConf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) ===
1073741824)
// test negative value
intercept[IllegalArgumentException] {
@@ -277,7 +277,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
"-90000000000g")
}
- spark.sessionState.conf.clear()
+ sqlConf.clear()
}
test("SparkSession can access configs set in SparkConf") {
@@ -305,7 +305,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
try {
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a")
val newSession = new SparkSession(sparkContext)
- assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a")
+ assert(newSession.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) == "a")
checkAnswer(
newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"),
Row(GLOBAL_TEMP_DATABASE.key, "a"))
@@ -338,16 +338,16 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-10365: PARQUET_OUTPUT_TIMESTAMP_TYPE") {
- spark.sessionState.conf.clear()
+ sqlConf.clear()
// check default value
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
- spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+ sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
- spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+ sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
@@ -356,7 +356,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, "invalid")
}
- spark.sessionState.conf.clear()
+ sqlConf.clear()
}
test("SPARK-22779: correctly compute default value for fallback configs") {
@@ -373,10 +373,10 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
.get
assert(displayValue === fallback.defaultValueString)
- spark.conf.set(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName())
+ sqlConf.setConf(SQLConf.PARQUET_COMPRESSION, GZIP.lowerCaseName())
assert(spark.conf.get(fallback.key) === GZIP.lowerCaseName())
- spark.conf.set(fallback, LZO.lowerCaseName())
+ sqlConf.setConf(fallback, LZO.lowerCaseName())
assert(spark.conf.get(fallback.key) === LZO.lowerCaseName())
val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
@@ -459,10 +459,10 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
test("set time zone") {
TimeZone.getAvailableIDs().foreach { zid =>
sql(s"set time zone '$zid'")
- assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) === zid)
+ assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) === zid)
}
sql("set time zone local")
- assert(spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE) ===
TimeZone.getDefault.getID)
+ assert(sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) ===
TimeZone.getDefault.getID)
val tz = "Invalid TZ"
checkError(
@@ -476,7 +476,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
(-18 to 18).map(v => (v, s"interval '$v' hours")).foreach { case (i,
interval) =>
sql(s"set time zone $interval")
- val zone = spark.conf.get(SQLConf.SESSION_LOCAL_TIMEZONE)
+ val zone = sqlConf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
if (i == 0) {
assert(zone === "Z")
} else {
@@ -504,7 +504,7 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
test("SPARK-47765: set collation") {
Seq("UNICODE", "UNICODE_CI", "utf8_lcase", "utf8_binary").foreach {
collation =>
sql(s"set collation $collation")
- assert(spark.conf.get(SQLConf.DEFAULT_COLLATION) ===
collation.toUpperCase(Locale.ROOT))
+ assert(sqlConf.getConf(SQLConf.DEFAULT_COLLATION) ===
collation.toUpperCase(Locale.ROOT))
}
checkError(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 8b11e0c69fa7..24732223c669 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -54,7 +54,7 @@ abstract class BucketedReadSuite extends QueryTest with
SQLTestUtils with Adapti
protected override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
+ spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING.key,
true)
}
protected override def afterAll(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 168b6b862992..e27ec32e287e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -50,7 +50,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
}
override def afterAll(): Unit = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 56c4aecb2377..773be0cc08e3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -262,7 +262,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION.key, "native")
}
override def afterAll(): Unit = {
@@ -1504,7 +1504,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// This is to avoid running a spark job to list of files in parallel
// by the InMemoryFileIndex.
- spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles *
2)
+ spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key,
numFiles * 2)
withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index f3ef73c6af5f..f7ff39622ed4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -1163,7 +1163,7 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
func: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int],
timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout,
batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = {
- val stateFormatVersion =
spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
+ val stateFormatVersion =
sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
val emptyRdd = spark.sparkContext.emptyRDD[InternalRow]
MemoryStream[Int]
.toDS()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7ab45e25799b..68436c4e355b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -542,10 +542,7 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
val metadataRoot =
Option(checkpointLocation).getOrElse(defaultCheckpointLocation)
additionalConfs.foreach(pair => {
- val value =
- if (sparkSession.conf.contains(pair._1)) {
- Some(sparkSession.conf.get(pair._1))
- } else None
+ val value = sparkSession.conf.getOption(pair._1)
resetConfValues(pair._1) = value
sparkSession.conf.set(pair._1, pair._2)
})
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
index defd5fd110de..a47c2f839692 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
@@ -265,7 +265,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest
{
private def assertQueryUsingRightBatchExecutor(
testSource: TestDataFrameProvider,
query: StreamingQuery): Unit = {
- val useWrapper = query.sparkSession.conf.get(
+ val useWrapper = query.sparkSession.sessionState.conf.getConf(
SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)
if (useWrapper) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ff1473fea369..4d4cc44eb3e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -103,6 +103,8 @@ trait SharedSparkSessionBase
new TestSparkSession(sparkConf)
}
+ protected def sqlConf: SQLConf = _spark.sessionState.conf
+
/**
* Initialize the [[TestSparkSession]]. Generally, this is just called from
* beforeAll; however, in test using styles other than FunSuite, there is
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index d84b9f796023..8c6113fb5569 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -86,7 +86,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
assert(ss2.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
!==
invalidPath, "warehouse conf in session options can't affect application
wide hadoop conf")
assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should
be passed to catalog")
- assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath),
+ assert(!ss.conf.get(WAREHOUSE_PATH.key).contains(invalidPath),
"session level conf should be passed to catalog")
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 69abb1d1673e..865ce81e151c 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -828,7 +828,7 @@ object SPARK_18360 {
.enableHiveSupport().getOrCreate()
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
- assert(new Path(defaultDbLocation) == new
Path(spark.conf.get(WAREHOUSE_PATH)))
+ assert(new Path(defaultDbLocation) == new
Path(spark.conf.get(WAREHOUSE_PATH.key)))
val hiveClient =
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
index aafc4764d246..1922144a92ef 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala
@@ -44,7 +44,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with
SQLTestUtils with TestHiveS
super.beforeAll()
originalConvertMetastoreParquet =
spark.conf.get(CONVERT_METASTORE_PARQUET.key)
originalConvertMetastoreORC = spark.conf.get(CONVERT_METASTORE_ORC.key)
- originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION)
+ originalORCImplementation = spark.conf.get(ORC_IMPLEMENTATION.key)
spark.conf.set(CONVERT_METASTORE_PARQUET.key, "false")
spark.conf.set(CONVERT_METASTORE_ORC.key, "false")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 3deb355e0e4a..594c097de2c7 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -79,13 +79,13 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
test("query global temp view") {
val df = Seq(1).toDF("i1")
df.createGlobalTempView("tbl1")
- val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE)
+ val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key)
checkAnswer(spark.sql(s"select * from ${global_temp_db}.tbl1"), Row(1))
spark.sql(s"drop view ${global_temp_db}.tbl1")
}
test("non-existent global temp view") {
- val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE)
+ val global_temp_db = spark.conf.get(GLOBAL_TEMP_DATABASE.key)
val e = intercept[AnalysisException] {
spark.sql(s"select * from ${global_temp_db}.nonexistentview")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]