This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 015f066 [SPARK-30840][CORE][SQL] Add version property for ConfigEntry
and ConfigBuilder
015f066 is described below
commit 015f066acbed0a98000f3ccf9c20ddc49fdfb2f6
Author: beliefer <[email protected]>
AuthorDate: Sat Feb 22 09:46:42 2020 +0900
[SPARK-30840][CORE][SQL] Add version property for ConfigEntry and
ConfigBuilder
### What changes were proposed in this pull request?
Spark `ConfigEntry` and `ConfigBuilder` missing Spark version information
of each configuration at release. This is not good for Spark user when they
visiting the page of spark configuration.
http://spark.apache.org/docs/latest/configuration.html
The new Spark SQL config docs looks like:

```
> SET -v
spark.sql.adaptive.enabled false When true, enable adaptive query
execution.
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2 The
relation with a non-empty partition ratio lower than this config will not be
considered as the build side of a broadcast-hash join in adaptive execution
regardless of its size.This configuration only has an effect when
'spark.sql.adaptive.enabled' is enabled.
spark.sql.adaptive.optimizeSkewedJoin.enabled true When true and
adaptive execution is enabled, a skewed join is automatically handled at
runtime.
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionFactor 10 A
partition is considered as a skewed partition if its size is larger than this
factor multiple the median partition size and also larger than
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionMaxSplits 5
Configures the maximum number of task to handle a skewed partition in adaptive
skewedjoin.
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold
64MB Configures the minimum size in bytes for a partition that is considered
as a skewed partition in adaptive skewed join.
spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled true
Whether to fetch the continuous shuffle blocks in batch. Instead of fetching
blocks one by one, fetching continuous shuffle blocks for the same map task in
batch can reduce IO and improve performance. Note, multiple continuous blocks
exist in single fetch request only happen when 'spark.sql.adaptive.enabled' and
'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled,
this feature also depends [...]
spark.sql.adaptive.shuffle.localShuffleReader.enabled true When true
and 'spark.sql.adaptive.enabled' is enabled, this enables the optimization of
converting the shuffle reader to local shuffle reader for the shuffle exchange
of the broadcast hash join in probe side.
spark.sql.adaptive.shuffle.maxNumPostShufflePartitions <undefined> The
advisory maximum number of post-shuffle partitions used in adaptive execution.
This is used as the initial number of pre-shuffle partitions. By default it
equals to spark.sql.shuffle.partitions. This configuration only has an effect
when 'spark.sql.adaptive.enabled' and
'spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled' is enabled.
```
**Note**: Because there are so many configuration items that are exposed
and require a lot of finishing, I will add the version numbers of these
configuration items in another PR.
### Why are the changes needed?
Supplemental configuration version information.
### Does this PR introduce any user-facing change?
Yes
### How was this patch tested?
Exists UT
Closes #27592 from beliefer/add-version-to-config.
Authored-by: beliefer <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../spark/internal/config/ConfigBuilder.scala | 16 ++++++----
.../apache/spark/internal/config/ConfigEntry.scala | 35 +++++++++++++++-------
.../org/apache/spark/sql/internal/SQLConf.scala | 4 +--
.../spark/sql/api/python/PythonSQLUtils.scala | 2 +-
.../spark/sql/execution/command/SetCommand.scala | 11 +++++--
.../apache/spark/sql/internal/SQLConfSuite.scala | 8 ++---
sql/gen-sql-config-docs.py | 10 +++++--
7 files changed, 57 insertions(+), 29 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 68e1994..8d5959a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -129,7 +129,7 @@ private[spark] class TypedConfigBuilder[T](
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter,
stringConverter, parent._doc,
- parent._public)
+ parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
@@ -144,7 +144,7 @@ private[spark] class TypedConfigBuilder[T](
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key,
parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault,
converter,
- stringConverter, parent._doc, parent._public)
+ stringConverter, parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
@@ -154,7 +154,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key,
parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter,
stringConverter,
- parent._doc, parent._public)
+ parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_ (entry))
entry
}
@@ -166,7 +166,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key,
parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter,
stringConverter,
- parent._doc, parent._public)
+ parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
@@ -186,6 +186,7 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
+ private[config] var _version = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]
@@ -199,6 +200,11 @@ private[spark] case class ConfigBuilder(key: String) {
this
}
+ def version(v: String): ConfigBuilder = {
+ _version = v
+ this
+ }
+
/**
* Registers a callback for when the config entry is finally instantiated.
Currently used by
* SQLConf to keep track of SQL configuration entries.
@@ -255,7 +261,7 @@ private[spark] case class ConfigBuilder(key: String) {
def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator,
_alternatives, _doc,
- _public, fallback)
+ _public, _version, fallback)
_onCreate.foreach(_(entry))
entry
}
diff --git
a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
index c5df4c8..8c0b11d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
@@ -39,6 +39,7 @@ package org.apache.spark.internal.config
* @param doc the documentation for the configuration
* @param isPublic if this configuration is public to the user. If it's
`false`, this
* configuration is only used internally and we should not
expose it to users.
+ * @param version the spark version when the configuration was released.
* @tparam T the value type
*/
private[spark] abstract class ConfigEntry[T] (
@@ -49,7 +50,8 @@ private[spark] abstract class ConfigEntry[T] (
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
- val isPublic: Boolean) {
+ val isPublic: Boolean,
+ val version: String) {
import ConfigEntry._
@@ -74,7 +76,8 @@ private[spark] abstract class ConfigEntry[T] (
def defaultValue: Option[T] = None
override def toString: String = {
- s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc,
public=$isPublic)"
+ s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
+ s"public=$isPublic, version=$version)"
}
}
@@ -87,7 +90,8 @@ private class ConfigEntryWithDefault[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
- isPublic: Boolean)
+ isPublic: Boolean,
+ version: String)
extends ConfigEntry(
key,
prependedKey,
@@ -96,7 +100,8 @@ private class ConfigEntryWithDefault[T] (
valueConverter,
stringConverter,
doc,
- isPublic
+ isPublic,
+ version
) {
override def defaultValue: Option[T] = Some(_defaultValue)
@@ -117,7 +122,8 @@ private class ConfigEntryWithDefaultFunction[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
- isPublic: Boolean)
+ isPublic: Boolean,
+ version: String)
extends ConfigEntry(
key,
prependedKey,
@@ -126,7 +132,8 @@ private class ConfigEntryWithDefaultFunction[T] (
valueConverter,
stringConverter,
doc,
- isPublic
+ isPublic,
+ version
) {
override def defaultValue: Option[T] = Some(_defaultFunction())
@@ -147,7 +154,8 @@ private class ConfigEntryWithDefaultString[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
- isPublic: Boolean)
+ isPublic: Boolean,
+ version: String)
extends ConfigEntry(
key,
prependedKey,
@@ -156,7 +164,8 @@ private class ConfigEntryWithDefaultString[T] (
valueConverter,
stringConverter,
doc,
- isPublic
+ isPublic,
+ version
) {
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
@@ -181,7 +190,8 @@ private[spark] class OptionalConfigEntry[T](
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
- isPublic: Boolean)
+ isPublic: Boolean,
+ version: String)
extends ConfigEntry[Option[T]](
key,
prependedKey,
@@ -190,7 +200,8 @@ private[spark] class OptionalConfigEntry[T](
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull,
doc,
- isPublic
+ isPublic,
+ version
) {
override def defaultValueString: String = ConfigEntry.UNDEFINED
@@ -210,6 +221,7 @@ private[spark] class FallbackConfigEntry[T] (
alternatives: List[String],
doc: String,
isPublic: Boolean,
+ version: String,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](
key,
@@ -219,7 +231,8 @@ private[spark] class FallbackConfigEntry[T] (
fallback.valueConverter,
fallback.stringConverter,
doc,
- isPublic
+ isPublic,
+ version
) {
override def defaultValueString: String = s"<value of ${fallback.key}>"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 44f3037..9f9e556 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2982,10 +2982,10 @@ class SQLConf extends Serializable with Logging {
* Return all the configuration definitions that have been defined in
[[SQLConf]]. Each
* definition contains key, defaultValue and doc.
*/
- def getAllDefinedConfs: Seq[(String, String, String)] =
sqlConfEntries.synchronized {
+ def getAllDefinedConfs: Seq[(String, String, String, String)] =
sqlConfEntries.synchronized {
sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
val displayValue = Option(getConfString(entry.key,
null)).getOrElse(entry.defaultValueString)
- (entry.key, displayValue, entry.doc)
+ (entry.key, displayValue, entry.doc, entry.version)
}.toSeq
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index bf3055d..03f5a60 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -40,7 +40,7 @@ private[sql] object PythonSQLUtils {
FunctionRegistry.functionSet.flatMap(f =>
FunctionRegistry.builtin.lookupFunction(f)).toArray
}
- def listSQLConfigs(): Array[(String, String, String)] = {
+ def listSQLConfigs(): Array[(String, String, String, String)] = {
val conf = new SQLConf()
// Py4J doesn't seem to translate Seq well, so we convert to an Array.
conf.getAllDefinedConfs.toArray
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index a12b261..3dc1d52 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -115,14 +115,19 @@ case class SetCommand(kv: Option[(String,
Option[String])]) extends RunnableComm
case Some(("-v", None)) =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.sessionState.conf.getAllDefinedConfs.sorted.map {
- case (key, defaultValue, doc) =>
- Row(key, Option(defaultValue).getOrElse("<undefined>"), doc)
+ case (key, defaultValue, doc, version) =>
+ Row(
+ key,
+ Option(defaultValue).getOrElse("<undefined>"),
+ doc,
+ Option(version).getOrElse("<unknown>"))
}
}
val schema = StructType(
StructField("key", StringType, nullable = false) ::
StructField("value", StringType, nullable = false) ::
- StructField("meaning", StringType, nullable = false) :: Nil)
+ StructField("meaning", StringType, nullable = false) ::
+ StructField("Since version", StringType, nullable = false) :: Nil)
(schema.toAttributes, runFunc)
// Queries the deprecated "mapred.reduce.tasks" property.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index c2d8493d..f389465 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -301,8 +301,8 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") ===
"lzo")
val displayValue = spark.sessionState.conf.getAllDefinedConfs
- .find { case (key, _, _) => key == fallback.key }
- .map { case (_, v, _) => v }
+ .find { case (key, _, _, _) => key == fallback.key }
+ .map { case (_, v, _, _) => v }
.get
assert(displayValue === fallback.defaultValueString)
@@ -313,8 +313,8 @@ class SQLConfSuite extends QueryTest with
SharedSparkSession {
assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")
val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
- .find { case (key, _, _) => key == fallback.key }
- .map { case (_, v, _) => v }
+ .find { case (key, _, _, _) => key == fallback.key }
+ .map { case (_, v, _, _) => v }
.get
assert(newDisplayValue === "lzo")
diff --git a/sql/gen-sql-config-docs.py b/sql/gen-sql-config-docs.py
index 04f5a85..98212ad 100644
--- a/sql/gen-sql-config-docs.py
+++ b/sql/gen-sql-config-docs.py
@@ -25,7 +25,7 @@ from mkdocs.structure.pages import markdown
from pyspark.java_gateway import launch_gateway
SQLConfEntry = namedtuple(
- "SQLConfEntry", ["name", "default", "description"])
+ "SQLConfEntry", ["name", "default", "description", "version"])
def get_public_sql_configs(jvm):
@@ -34,6 +34,7 @@ def get_public_sql_configs(jvm):
name=_sql_config._1(),
default=_sql_config._2(),
description=_sql_config._3(),
+ version=_sql_config._4()
)
for _sql_config in
jvm.org.apache.spark.sql.api.python.PythonSQLUtils.listSQLConfigs()
]
@@ -49,12 +50,13 @@ def generate_sql_configs_table(sql_configs, path):
```html
<table class="table">
- <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+ <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since
Version</th></tr>
<tr>
<td><code>spark.sql.adaptive.enabled</code></td>
<td>false</td>
<td><p>When true, enable adaptive query execution.</p></td>
+ <td>2.1.0</td>
</tr>
...
@@ -68,7 +70,7 @@ def generate_sql_configs_table(sql_configs, path):
f.write(dedent(
"""
<table class="table">
- <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+ <tr><th>Property
Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
"""
))
for config in sorted(sql_configs, key=lambda x: x.name):
@@ -96,12 +98,14 @@ def generate_sql_configs_table(sql_configs, path):
<td><code>{name}</code></td>
<td>{default}</td>
<td>{description}</td>
+ <td>{version}</td>
</tr>
"""
.format(
name=config.name,
default=default,
description=markdown.markdown(config.description),
+ version=config.version
)
))
f.write("</table>\n")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]