This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fcc5c72a [KYUUBI #6260] Clean up and improve comments for spark
extensions
4fcc5c72a is described below
commit 4fcc5c72a2d9c9300f452935449900cae19cc8ea
Author: Cheng Pan <[email protected]>
AuthorDate: Sun Apr 7 18:20:14 2024 +0800
[KYUUBI #6260] Clean up and improve comments for spark extensions
# :mag: Description
This pull request
- improves comments for SPARK-33832
- removes unused `spark.sql.analyzer.classification.enabled` (I didn't
update the migration rules because this configuration seems never to work
properly)
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Review
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6260 from pan3793/nit.
Closes #6260
d762d30e9 [Cheng Pan] update comment
4ebaa04ea [Cheng Pan] nit
b303f05bb [Cheng Pan] remove spark.sql.analyzer.classification.enabled
b021cbc0a [Cheng Pan] Improve docs for SPARK-33832
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/extensions/engines/spark/rules.md | 1 -
.../org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala | 4 +++-
.../org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala | 4 +++-
.../src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | 10 ----------
.../org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala | 3 ---
.../org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala | 4 +++-
.../src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | 10 ----------
.../org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala | 3 ---
.../src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | 10 ----------
.../org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala | 3 ---
10 files changed, 9 insertions(+), 43 deletions(-)
diff --git a/docs/extensions/engines/spark/rules.md
b/docs/extensions/engines/spark/rules.md
index 986fda14c..5fb8beaec 100644
--- a/docs/extensions/engines/spark/rules.md
+++ b/docs/extensions/engines/spark/rules.md
@@ -70,7 +70,6 @@ Kyuubi provides some configs to make these feature easy to
use.
| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true
| Add repartition node at the top of query
plan. An approach of merging small files.
| 1.2.0 |
| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false
| Ensure shuffle node exists before shuffled
join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario
join, multi table join).
| 1.2.0 |
| spark.sql.optimizer.finalStageConfigIsolation.enabled | false
| If true, the final stage support use
different config with previous stage. The prefix of final stage config key
should be `spark.sql.finalStage.`. For example, the raw spark config:
`spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config
should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. |
1.2.0 |
-| spark.sql.analyzer.classification.enabled | false
| When true, allows Kyuubi engine to judge this
SQL's classification and set `spark.sql.analyzer.classification` back into
sessionConf. Through this configuration item, Spark can optimizing
configuration dynamic.
| 1.4.0 |
| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true
| When true, we will follow target table
properties to insert zorder or not. The key properties are: 1)
`kyuubi.zorder.enabled`: if this property is true, we will insert zorder before
writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by
these cols. |
1.4.0 |
| spark.sql.optimizer.zorderGlobalSort.enabled | true
| When true, we do a global sort using zorder.
Note that, it can cause data skew issue if the zorder columns have less
cardinality. When false, we only do local sort using zorder.
| 1.4.0 |
| spark.sql.watchdog.maxPartitions | none
| Set the max partition number when spark scans
a data source. Enable maxPartition Strategy by specifying this configuration.
Add maxPartitions Strategy to avoid scan excessive partitions on partitioned
table, it's optional that works with defined
| 1.4.0 |
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
index 1a02e8c1e..92626f027 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
@@ -49,7 +49,9 @@ object InsertShuffleNodeBeforeJoin extends Rule[SparkPlan] {
}
}
- // Since spark 3.3, insertShuffleBeforeJoin shouldn't be applied if join is
skewed.
+ // SPARK-33832 (Spark 3.3) moves the rule OptimizeSkewedJoin from
queryStageOptimizerRules
+ // to queryStagePreparationRules, injecting shuffle after OptimizeSkewedJoin
may produce
+ // invalid query plan.
private def insertShuffleBeforeJoin(plan: SparkPlan): SparkPlan = plan
transformUp {
case smj @ SortMergeJoinExec(_, _, _, _, l, r, isSkewJoin) if !isSkewJoin
=>
smj.withNewChildren(checkAndInsertShuffle(smj.requiredChildDistribution.head,
l) ::
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
index 1a02e8c1e..92626f027 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
@@ -49,7 +49,9 @@ object InsertShuffleNodeBeforeJoin extends Rule[SparkPlan] {
}
}
- // Since spark 3.3, insertShuffleBeforeJoin shouldn't be applied if join is
skewed.
+ // SPARK-33832 (Spark 3.3) moves the rule OptimizeSkewedJoin from
queryStageOptimizerRules
+ // to queryStagePreparationRules, injecting shuffle after OptimizeSkewedJoin
may produce
+ // invalid query plan.
private def insertShuffleBeforeJoin(plan: SparkPlan): SparkPlan = plan
transformUp {
case smj @ SortMergeJoinExec(_, _, _, _, l, r, isSkewJoin) if !isSkewJoin
=>
smj.withNewChildren(checkAndInsertShuffle(smj.requiredChildDistribution.head,
l) ::
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 02d592ae5..93c87dfa6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -48,16 +48,6 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
- val SQL_CLASSIFICATION = "spark.sql.analyzer.classification"
- val SQL_CLASSIFICATION_ENABLED =
- buildConf("spark.sql.analyzer.classification.enabled")
- .doc("When true, allows Kyuubi engine to judge this SQL's classification
" +
- s"and set `$SQL_CLASSIFICATION` back into sessionConf. " +
- "Through this configuration item, Spark can optimizing configuration
dynamic")
- .version("1.4.0")
- .booleanConf
- .createWithDefault(false)
-
val INSERT_ZORDER_BEFORE_WRITING =
buildConf("spark.sql.optimizer.insertZorderBeforeWriting.enabled")
.doc("When true, we will follow target table properties to insert zorder
or not. " +
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
index dd9ffbf16..996bef763 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
trait KyuubiSparkSQLExtensionTest extends QueryTest
with SQLTestUtils
with AdaptiveSparkPlanHelper {
@@ -87,7 +85,6 @@ trait KyuubiSparkSQLExtensionTest extends QueryTest
.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
- .set(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.hadoop.hive.metastore.client.capability.check", "false")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
index 1a02e8c1e..92626f027 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/InsertShuffleNodeBeforeJoin.scala
@@ -49,7 +49,9 @@ object InsertShuffleNodeBeforeJoin extends Rule[SparkPlan] {
}
}
- // Since spark 3.3, insertShuffleBeforeJoin shouldn't be applied if join is
skewed.
+ // SPARK-33832 (Spark 3.3) moves the rule OptimizeSkewedJoin from
queryStageOptimizerRules
+ // to queryStagePreparationRules, injecting shuffle after OptimizeSkewedJoin
may produce
+ // invalid query plan.
private def insertShuffleBeforeJoin(plan: SparkPlan): SparkPlan = plan
transformUp {
case smj @ SortMergeJoinExec(_, _, _, _, l, r, isSkewJoin) if !isSkewJoin
=>
smj.withNewChildren(checkAndInsertShuffle(smj.requiredChildDistribution.head,
l) ::
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 91e4f14cd..0d4cf368a 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -48,16 +48,6 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
- val SQL_CLASSIFICATION = "spark.sql.analyzer.classification"
- val SQL_CLASSIFICATION_ENABLED =
- buildConf("spark.sql.analyzer.classification.enabled")
- .doc("When true, allows Kyuubi engine to judge this SQL's classification
" +
- s"and set `$SQL_CLASSIFICATION` back into sessionConf. " +
- "Through this configuration item, Spark can optimizing configuration
dynamic")
- .version("1.4.0")
- .booleanConf
- .createWithDefault(false)
-
val INSERT_ZORDER_BEFORE_WRITING =
buildConf("spark.sql.optimizer.insertZorderBeforeWriting.enabled")
.doc("When true, we will follow target table properties to insert zorder
or not. " +
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
index dd9ffbf16..996bef763 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
trait KyuubiSparkSQLExtensionTest extends QueryTest
with SQLTestUtils
with AdaptiveSparkPlanHelper {
@@ -87,7 +85,6 @@ trait KyuubiSparkSQLExtensionTest extends QueryTest
.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
- .set(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.hadoop.hive.metastore.client.capability.check", "false")
diff --git
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 02d592ae5..93c87dfa6 100644
---
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -48,16 +48,6 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
- val SQL_CLASSIFICATION = "spark.sql.analyzer.classification"
- val SQL_CLASSIFICATION_ENABLED =
- buildConf("spark.sql.analyzer.classification.enabled")
- .doc("When true, allows Kyuubi engine to judge this SQL's classification
" +
- s"and set `$SQL_CLASSIFICATION` back into sessionConf. " +
- "Through this configuration item, Spark can optimizing configuration
dynamic")
- .version("1.4.0")
- .booleanConf
- .createWithDefault(false)
-
val INSERT_ZORDER_BEFORE_WRITING =
buildConf("spark.sql.optimizer.insertZorderBeforeWriting.enabled")
.doc("When true, we will follow target table properties to insert zorder
or not. " +
diff --git
a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
index e58ac726c..1fa136e60 100644
---
a/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
+++
b/extensions/spark/kyuubi-extension-spark-common/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
@@ -24,8 +24,6 @@ import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils
-import org.apache.kyuubi.sql.KyuubiSQLConf
-
trait KyuubiSparkSQLExtensionTest extends QueryTest
with SQLTestUtils
with AdaptiveSparkPlanHelper {
@@ -84,7 +82,6 @@ trait KyuubiSparkSQLExtensionTest extends QueryTest
.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
- .set(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.hadoop.hive.metastore.client.capability.check", "false")