Repository: spark Updated Branches: refs/heads/master b056e8cb0 -> 66773eb8a
[SPARK-15012][SQL] Simplify configuration API further ## What changes were proposed in this pull request? 1. Remove all the `spark.setConf` etc. Just expose `spark.conf` 2. Make `spark.conf` take in things set in the core `SparkConf` as well, otherwise users may get confused This was done for both the Python and Scala APIs. ## How was this patch tested? `SQLConfSuite`, python tests. This one fixes the failed tests in #12787 Closes #12787 Author: Andrew Or <[email protected]> Author: Yin Huai <[email protected]> Closes #12798 from yhuai/conf-api. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66773eb8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66773eb8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66773eb8 Branch: refs/heads/master Commit: 66773eb8a55bfe6437dd4096c2c55685aca29dcd Parents: b056e8c Author: Andrew Or <[email protected]> Authored: Fri Apr 29 20:46:07 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Fri Apr 29 20:46:07 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/context.py | 4 +- python/pyspark/sql/session.py | 29 ------ python/pyspark/sql/tests.py | 4 +- .../spark/sql/ContinuousQueryManager.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 4 +- .../spark/sql/RelationalGroupedDataset.scala | 2 +- .../org/apache/spark/sql/RuntimeConfig.scala | 26 ++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 28 ++++-- .../org/apache/spark/sql/SparkSession.scala | 95 +++----------------- .../sql/execution/command/SetCommand.scala | 11 +-- .../InsertIntoHadoopFsRelation.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 30 +++---- .../execution/streaming/FileStreamSinkLog.scala | 6 +- .../org/apache/spark/sql/internal/SQLConf.scala | 3 - .../spark/sql/internal/SessionState.scala | 19 +--- .../org/apache/spark/sql/SQLContextSuite.scala | 6 -- .../spark/sql/internal/SQLConfSuite.scala | 16 +++- .../spark/sql/hive/HiveSessionState.scala | 8 +- 18 files changed, 108 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 417d719..2096236 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -114,7 +114,7 @@ class SQLContext(object): def setConf(self, key, value): """Sets the given Spark SQL configuration property. """ - self.sparkSession.setConf(key, value) + self.sparkSession.conf.set(key, value) @ignore_unicode_prefix @since(1.3) @@ -133,7 +133,7 @@ class SQLContext(object): >>> sqlContext.getConf("spark.sql.shuffle.partitions", u"10") u'50' """ - return self.sparkSession.getConf(key, defaultValue) + return self.sparkSession.conf.get(key, defaultValue) @property @since("1.3.1") http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index c245261..35c36b4 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -134,35 +134,6 @@ class SparkSession(object): self._conf = RuntimeConfig(self._jsparkSession.conf()) return self._conf - @since(2.0) - def setConf(self, key, value): - """ - Sets the given Spark SQL configuration property. - """ - self._jsparkSession.setConf(key, value) - - @ignore_unicode_prefix - @since(2.0) - def getConf(self, key, defaultValue=None): - """Returns the value of Spark SQL configuration property for the given key. - - If the key is not set and defaultValue is not None, return - defaultValue. If the key is not set and defaultValue is None, return - the system default value. - - >>> spark.getConf("spark.sql.shuffle.partitions") - u'200' - >>> spark.getConf("spark.sql.shuffle.partitions", "10") - u'10' - >>> spark.setConf("spark.sql.shuffle.partitions", "50") - >>> spark.getConf("spark.sql.shuffle.partitions", "10") - u'50' - """ - if defaultValue is not None: - return self._jsparkSession.getConf(key, defaultValue) - else: - return self._jsparkSession.getConf(key) - @property @since(2.0) def catalog(self): http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ea98206..4995b26 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1397,9 +1397,9 @@ class SQLTests(ReusedPySparkTestCase): def test_conf(self): spark = self.sparkSession - spark.setConf("bogo", "sipeo") + spark.conf.set("bogo", "sipeo") self.assertEqual(self.sparkSession.conf.get("bogo"), "sipeo") - spark.setConf("bogo", "ta") + spark.conf.set("bogo", "ta") self.assertEqual(spark.conf.get("bogo"), "ta") self.assertEqual(spark.conf.get("bogo", "not.read"), "ta") self.assertEqual(spark.conf.get("not.set", "ta"), "ta") http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 9e2e2d0..f82130c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -184,7 +184,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) { val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() - if (sparkSession.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { + if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6a600c1..28f5ccd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -284,9 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { new Path(userSpecified).toUri.toString }.orElse { val checkpointConfig: Option[String] = - df.sparkSession.getConf( - SQLConf.CHECKPOINT_LOCATION, - None) + df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None) checkpointConfig.map { location => new Path(location, queryName).toUri.toString http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 7ee9732..4f5bf63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -302,7 +302,7 @@ class RelationalGroupedDataset protected[sql]( */ def pivot(pivotColumn: String): RelationalGroupedDataset = { // This is to prevent unintended OOM errors when the number of distinct values is large - val maxValues = df.sparkSession.getConf(SQLConf.DATAFRAME_PIVOT_MAX_VALUES) + val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES) // Get the distinct values of the column and sort them so its consistent val values = df.select(pivotColumn) .distinct() http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index f2e8515..670288b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.internal.SQLConf + /** * Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]]. * @@ -78,6 +80,30 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { /** * Returns the value of Spark runtime configuration property for the given key. + */ + @throws[NoSuchElementException]("if the key is not set") + protected[sql] def get[T](entry: ConfigEntry[T]): T = { + sqlConf.getConf(entry) + } + + /** + * Returns the value of Spark runtime configuration property for the given key. + */ + protected[sql] def get[T](entry: ConfigEntry[T], default: T): T = { + sqlConf.getConf(entry, default) + } + + /** + * Returns all properties set in this conf. + * + * @since 2.0.0 + */ + def getAll: Map[String, String] = { + sqlConf.getAllConfs + } + + /** + * Returns the value of Spark runtime configuration property for the given key. * * @since 2.0.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6dfac3d..ff633cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -134,13 +134,15 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def setConf(props: Properties): Unit = sparkSession.setConf(props) + def setConf(props: Properties): Unit = { + sessionState.conf.setConf(props) + } /** * Set the given Spark SQL configuration property. */ private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = { - sparkSession.setConf(entry, value) + sessionState.conf.setConf(entry, value) } /** @@ -149,7 +151,9 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def setConf(key: String, value: String): Unit = sparkSession.setConf(key, value) + def setConf(key: String, value: String): Unit = { + sparkSession.conf.set(key, value) + } /** * Return the value of Spark SQL configuration property for the given key. @@ -157,13 +161,17 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def getConf(key: String): String = sparkSession.getConf(key) + def getConf(key: String): String = { + sparkSession.conf.get(key) + } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set * yet, return `defaultValue` in [[ConfigEntry]]. */ - private[sql] def getConf[T](entry: ConfigEntry[T]): T = sparkSession.getConf(entry) + private[sql] def getConf[T](entry: ConfigEntry[T]): T = { + sparkSession.conf.get(entry) + } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set @@ -171,7 +179,7 @@ class SQLContext private[sql]( * desired one. */ private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { - sparkSession.getConf(entry, defaultValue) + sparkSession.conf.get(entry, defaultValue) } /** @@ -181,7 +189,9 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def getConf(key: String, defaultValue: String): String = sparkSession.getConf(key, defaultValue) + def getConf(key: String, defaultValue: String): String = { + sparkSession.conf.get(key, defaultValue) + } /** * Return all the configuration properties that have been set (i.e. not the default). @@ -190,7 +200,9 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def getAllConfs: immutable.Map[String, String] = sparkSession.getAllConfs + def getAllConfs: immutable.Map[String, String] = { + sparkSession.conf.getAll + } protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql) http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 11c0aaa..7d3ff9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -109,6 +109,18 @@ class SparkSession private( protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog /** + * Runtime configuration interface for Spark. + * + * This is the interface through which the user can get and set all Spark and Hadoop + * configurations that are relevant to Spark SQL. When getting the value of a config, + * this defaults to the value set in the underlying [[SparkContext]], if any. + * + * @group config + * @since 2.0.0 + */ + @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf) + + /** * :: Experimental :: * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s * that listen for execution metrics. @@ -187,89 +199,6 @@ class SparkSession private( } - /* -------------------------------------------------- * - | Methods for accessing or mutating configurations | - * -------------------------------------------------- */ - - /** - * Runtime configuration interface for Spark. - * - * This is the interface through which the user can get and set all Spark and Hadoop - * configurations that are relevant to Spark SQL. When getting the value of a config, - * this defaults to the value set in the underlying [[SparkContext]], if any. - * - * @group config - * @since 2.0.0 - */ - @transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf) - - /** - * Set Spark SQL configuration properties. - * - * @group config - * @since 2.0.0 - */ - def setConf(props: Properties): Unit = sessionState.setConf(props) - - /** - * Set the given Spark SQL configuration property. - * - * @group config - * @since 2.0.0 - */ - def setConf(key: String, value: String): Unit = sessionState.setConf(key, value) - - /** - * Return the value of Spark SQL configuration property for the given key. - * - * @group config - * @since 2.0.0 - */ - def getConf(key: String): String = sessionState.conf.getConfString(key) - - /** - * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue`. - * - * @group config - * @since 2.0.0 - */ - def getConf(key: String, defaultValue: String): String = { - sessionState.conf.getConfString(key, defaultValue) - } - - /** - * Return all the configuration properties that have been set (i.e. not the default). - * This creates a new copy of the config properties in the form of a Map. - * - * @group config - * @since 2.0.0 - */ - def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs - - /** - * Set the given Spark SQL configuration property. - */ - protected[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = { - sessionState.setConf(entry, value) - } - - /** - * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue` in [[ConfigEntry]]. - */ - protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry) - - /** - * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the - * desired one. - */ - protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { - sessionState.conf.getConf(entry, defaultValue) - } - - /* --------------------------------- * | Methods for creating DataFrames | * --------------------------------- */ http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index bbb2a22..2409b5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -56,7 +56,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm "determining the number of reducers is not supported." throw new IllegalArgumentException(msg) } else { - sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) + sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value) Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) } } @@ -65,7 +65,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Configures a single property. case Some((key, Some(value))) => val runFunc = (sparkSession: SparkSession) => { - sparkSession.setConf(key, value) + sparkSession.conf.set(key, value) Seq(Row(key, value)) } (keyValueOutput, runFunc) @@ -74,7 +74,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Queries all key-value pairs that are set in the SQLConf of the sparkSession. case None => val runFunc = (sparkSession: SparkSession) => { - sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq + sparkSession.conf.getAll.map { case (k, v) => Row(k, v) }.toSeq } (keyValueOutput, runFunc) @@ -107,10 +107,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm // Queries a single property. case Some((key, None)) => val runFunc = (sparkSession: SparkSession) => { - val value = - try sparkSession.getConf(key) catch { - case _: NoSuchElementException => "<undefined>" - } + val value = sparkSession.conf.getOption(key).getOrElse("<undefined>") Seq(Row(key, value)) } (keyValueOutput, runFunc) http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 4df7d0c..4921e4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation( dataColumns = dataColumns, inputSchema = query.output, PartitioningUtils.DEFAULT_PARTITION_NAME, - sparkSession.getConf(SQLConf.PARTITION_MAX_FILES), + sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES), isAppend) } http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index c689ad0..b1513bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -143,10 +143,9 @@ private[sql] class DefaultSource parameters .get(ParquetRelation.MERGE_SCHEMA) .map(_.toBoolean) - .getOrElse(sparkSession.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + .getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) - val mergeRespectSummaries = - sparkSession.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) + val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) val filesByType = splitFiles(files) @@ -281,22 +280,23 @@ private[sql] class DefaultSource // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING)) + sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING)) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) + sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) // Try to push down filters when filter push-down is enabled. - val pushed = if (sparkSession.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(requiredSchema, _)) - .reduceOption(FilterApi.and) - } else { - None - } + val pushed = + if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(requiredSchema, _)) + .reduceOption(FilterApi.and) + } else { + None + } val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index c548fbd..b694b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -80,11 +80,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) * a live lock may happen if the compaction happens too frequently: one processing keeps deleting * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. */ - private val fileCleanupDelayMs = sparkSession.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) + private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) - private val isDeletingExpiredLog = sparkSession.getConf(SQLConf.FILE_SINK_LOG_DELETION) + private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION) - private val compactInterval = sparkSession.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) + private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) require(compactInterval > 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7de7748..0bcf0f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -763,9 +763,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { } private def setConfWithCheck(key: String, value: String): Unit = { - if (key.startsWith("spark.") && !key.startsWith("spark.sql.")) { - logWarning(s"Attempt to set non-Spark SQL config in SQLConf: key = $key, value = $value") - } settings.put(key, value) } http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index cacf50e..6fa044a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -152,9 +152,11 @@ private[sql] class SessionState(sparkSession: SparkSession) { private val jarClassLoader: NonClosableMutableURLClassLoader = sparkSession.sharedState.jarClassLoader - // Automatically extract `spark.sql.*` entries and put it in our SQLConf + // Automatically extract all entries and put it in our SQLConf // We need to call it after all of vals have been initialized. - setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf)) + sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) => + conf.setConfString(k, v) + } // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days @@ -170,19 +172,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) } - final def setConf(properties: Properties): Unit = { - properties.asScala.foreach { case (k, v) => setConf(k, v) } - } - - final def setConf[T](entry: ConfigEntry[T], value: T): Unit = { - conf.setConf(entry, value) - setConf(entry.key, entry.stringConverter(value)) - } - - def setConf(key: String, value: String): Unit = { - conf.setConfString(key, value) - } - def addJar(path: String): Unit = { sparkSession.sparkContext.addJar(path) http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 2f62ad4..1d5fc57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -79,10 +79,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } - test("SQLContext can access `spark.sql.*` configs") { - sc.conf.set("spark.sql.with.or.without.you", "my love") - val sqlContext = new SQLContext(sc) - assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") - } - } http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index e687e6a..b87f482 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.internal -import org.apache.spark.sql.{QueryTest, SQLContext} +import org.apache.spark.sql.{QueryTest, SparkSession, SQLContext} import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} class SQLConfSuite extends QueryTest with SharedSQLContext { @@ -125,4 +125,18 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { sqlContext.conf.clear() } + + test("SparkSession can access configs set in SparkConf") { + try { + sparkContext.conf.set("spark.to.be.or.not.to.be", "my love") + sparkContext.conf.set("spark.sql.with.or.without.you", "my love") + val spark = new SparkSession(sparkContext) + assert(spark.conf.get("spark.to.be.or.not.to.be") == "my love") + assert(spark.conf.get("spark.sql.with.or.without.you") == "my love") + } finally { + sparkContext.conf.remove("spark.to.be.or.not.to.be") + sparkContext.conf.remove("spark.sql.with.or.without.you") + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/66773eb8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index b17a88b..f307691 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ @@ -114,12 +113,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) * - allow SQL11 keywords to be used as identifiers */ def setDefaultOverrideConfs(): Unit = { - setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false") - } - - override def setConf(key: String, value: String): Unit = { - super.setConf(key, value) - metadataHive.runSqlHive(s"SET $key=$value") + conf.setConfString(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false") } override def addJar(path: String): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
