This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a3545e [KYUUBI #1002][FOLLOWUP] Refine sql classification rule
7a3545e is described below
commit 7a3545e7484f9dc96dfb5f2e43a8981b1aeb707b
Author: ulysses-you <[email protected]>
AuthorDate: Fri Sep 24 12:02:30 2021 +0800
[KYUUBI #1002][FOLLOWUP] Refine sql classification rule
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Follow up [KYUUBI #1002] [KYUUBI #1035] [KYUUBI #1037]
- correct the config key; we should use `spark.` as the prefix
- change the config to false by default
- correct the class loader; thread context is not safe, spark and hive
often change it
- update the log and exception
following is the failed error msg:
```
Caused by: java.io.FileNotFoundException:
file:/Users/cathy/Desktop/tmp/spark-3.1.2-bin-hadoop2.7/jars/kyuubi-extension-spark-3-1_2.12-1.4.0-SNAPSHOT.jar!/sql-classification-default.json
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:916)
at
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2745)
at
org.apache.kyuubi.sql.sqlclassification.KyuubiGetSqlClassification$.<init>(KyuubiGetSqlClassification.scala:51)
at
org.apache.kyuubi.sql.sqlclassification.KyuubiGetSqlClassification$.<clinit>(KyuubiGetSqlClassification.scala)
... 59 more
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1140 from ulysses-you/refine.
Closes #1002
f39ef2cf [ulysses-you] address comment
5cb94459 [ulysses-you] nit
de9fd932 [ulysses-you] test
78e8c147 [ulysses-you] default
6f23c63e [ulysses-you] refine
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 7 +++--
.../KyuubiGetSqlClassification.scala | 34 ++++++++++------------
.../KyuubiSqlClassification.scala | 2 +-
.../spark/sql/KyuubiSparkSQLExtensionTest.scala | 3 ++
.../apache/spark/sql/SqlClassificationSuite.scala | 13 +++++----
docs/sql/rules.md | 2 +-
6 files changed, 31 insertions(+), 30 deletions(-)
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 57adc3f..89b8f2d 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -68,14 +68,15 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val SQL_CLASSIFICATION = "spark.sql.analyzer.classification"
val SQL_CLASSIFICATION_ENABLED =
- buildConf("kyuubi.spark.sql.classification.enabled")
+ buildConf("spark.sql.analyzer.classification.enabled")
.doc("When true, allows Kyuubi engine to judge this SQL's classification
" +
- "and set it into sessionConf. " +
+ s"and set `$SQL_CLASSIFICATION` back into sessionConf. " +
"Through this configuration item, Spark can optimizing configuration
dynamic")
.version("1.4.0")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val INSERT_ZORDER_BEFORE_WRITING =
buildConf("spark.sql.optimizer.insertZorderBeforeWriting.enabled")
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala
index fd17c5a..8bc0fdf 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala
@@ -17,10 +17,10 @@
package org.apache.kyuubi.sql.sqlclassification
-import java.io.File
import java.net.URL
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.kyuubi.sql.KyuubiSQLConf._
@@ -35,29 +35,25 @@ import org.apache.kyuubi.sql.KyuubiSQLConf._
* If there have no this named jsonFile,
* the service will upload the default matching rule:
sql-classification-default.json.
*/
-object KyuubiGetSqlClassification {
-
+object KyuubiGetSqlClassification extends Logging {
private val jsonNode: Option[JsonNode] = {
- SQLConf.get.getConf(SQL_CLASSIFICATION_ENABLED) match {
- case true =>
- val objectMapper = new ObjectMapper
- var url: URL = Thread.currentThread().getContextClassLoader
- .getResource("sql-classification.json")
- if (url == null) {
- url = Thread.currentThread().getContextClassLoader
- .getResource("sql-classification-default.json")
- }
- val defaultSqlClassificationFile = url.getPath
- Some(objectMapper.readTree(new File(defaultSqlClassificationFile)))
- case false =>
- None
+ if (SQLConf.get.getConf(SQL_CLASSIFICATION_ENABLED)) {
+ val objectMapper = new ObjectMapper
+ var url: URL =
getClass.getClassLoader.getResource("sql-classification.json")
+ if (url == null) {
+ logInfo("sql-classification.json is not found, use default config
instead")
+ url =
getClass.getClassLoader.getResource("sql-classification-default.json")
+ }
+ Some(objectMapper.readTree(url))
+ } else {
+ None
}
}
/**
* Notice:
- * You need to make sure that the configuration item:
kyuubi.spark.sql.classification.enabled
- * is true
+ * You need to make sure that the configuration item:
SQL_CLASSIFICATION_ENABLED
+ * is true
* @param simpleName: the analyzied_logical_plan's getSimpleName
* @return: This sql's classification
*/
@@ -71,7 +67,7 @@ object KyuubiGetSqlClassification {
}
}.getOrElse(
throw new IllegalArgumentException(
- "The configuration item: kyuubi.spark.sql.classification.enabled is
false")
+ s"You should restart engine with: ${SQL_CLASSIFICATION_ENABLED.key}
true")
)
}
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala
index 56c63d3..c308ba0 100644
---
a/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiSqlClassification.scala
@@ -29,7 +29,7 @@ case class KyuubiSqlClassification(session: SparkSession)
extends Rule[LogicalPl
if (conf.getConf(SQL_CLASSIFICATION_ENABLED) && plan.resolved) {
val simpleName = plan.getClass.getSimpleName
val sqlClassification =
KyuubiGetSqlClassification.getSqlClassification(simpleName)
- session.conf.set("kyuubi.spark.sql.classification", sqlClassification)
+ session.conf.set(SQL_CLASSIFICATION, sqlClassification)
}
plan
}
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
index d24e3fe..8ee0176 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/KyuubiSparkSQLExtensionTest.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
trait KyuubiSparkSQLExtensionTest extends QueryTest
with SQLTestUtils
with AdaptiveSparkPlanHelper {
@@ -77,6 +79,7 @@ trait KyuubiSparkSQLExtensionTest extends QueryTest
new SparkConf()
.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"org.apache.kyuubi.sql.KyuubiSparkSQLExtension")
+ .set(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.hadoop.hive.metastore.client.capability.check", "false")
diff --git
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
index 7ff0024..ba0b2f4 100644
---
a/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
+++
b/dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/SqlClassificationSuite.scala
@@ -20,16 +20,17 @@ package org.apache.spark.sql
import scala.collection.mutable.Set
import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.KyuubiSQLConf._
class SqlClassificationSuite extends KyuubiSparkSQLExtensionTest {
test("Sql classification for ddl") {
withSQLConf(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key -> "true") {
withDatabase("inventory") {
val df = sql("CREATE DATABASE inventory;")
- assert(df.sparkSession.conf.get("kyuubi.spark.sql.classification") ===
"ddl")
+ assert(df.sparkSession.conf.get(SQL_CLASSIFICATION) === "ddl")
}
val df = sql("select timestamp'2021-06-01'")
- assert(df.sparkSession.conf.get("kyuubi.spark.sql.classification") !==
"ddl")
+ assert(df.sparkSession.conf.get(SQL_CLASSIFICATION) !== "ddl")
}
}
@@ -38,7 +39,7 @@ class SqlClassificationSuite extends
KyuubiSparkSQLExtensionTest {
val df01 = sql("CREATE TABLE IF NOT EXISTS students " +
"(name VARCHAR(64), address VARCHAR(64)) " +
"USING PARQUET PARTITIONED BY (student_id INT);")
- assert(df01.sparkSession.conf.get("kyuubi.spark.sql.classification") ===
"ddl")
+ assert(df01.sparkSession.conf.get(SQL_CLASSIFICATION) === "ddl")
val sql02 = "INSERT INTO students VALUES ('Amy Smith', '123 Park Ave,
San Jose', 111111);"
val df02 = sql(sql02)
@@ -48,19 +49,19 @@ class SqlClassificationSuite extends
KyuubiSparkSQLExtensionTest {
spark.sessionState.sqlParser.parsePlan(sql02)).toString())
// scalastyle:on println
- assert(df02.sparkSession.conf.get("kyuubi.spark.sql.classification") ===
"dml")
+ assert(df02.sparkSession.conf.get(SQL_CLASSIFICATION) === "dml")
}
}
test("Sql classification for other and dql") {
withSQLConf(KyuubiSQLConf.SQL_CLASSIFICATION_ENABLED.key -> "true") {
val df01 = sql("SET spark.sql.variable.substitute=false")
- assert(df01.sparkSession.conf.get("kyuubi.spark.sql.classification") ===
"other")
+ assert(df01.sparkSession.conf.get(SQL_CLASSIFICATION) === "other")
val sql02 = "select timestamp'2021-06-01'"
val df02 = sql(sql02)
- assert(df02.sparkSession.conf.get("kyuubi.spark.sql.classification") ===
"dql")
+ assert(df02.sparkSession.conf.get(SQL_CLASSIFICATION) === "dql")
}
}
diff --git a/docs/sql/rules.md b/docs/sql/rules.md
index d8b358e..a1d810b 100644
--- a/docs/sql/rules.md
+++ b/docs/sql/rules.md
@@ -67,6 +67,6 @@ spark.sql.optimizer.insertRepartitionNum | none | The
partition number if `spark
spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The
partition number of each dynamic partition if
`spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will
repartition by dynamic partition columns to reduce the small file but that can
cause data skew. This config is to extend the partition of dynamic partition
column to avoid skew but may generate some small files. | 1.2.0
spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle
node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin`
works (complex scenario join, multi table join). | 1.2.0
spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the
final stage support use different config with previous stage. The prefix of
final stage config key should be `spark.sql.finalStage.`. For example, the raw
spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final
stage config should be:
`spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0
-kyuubi.spark.sql.classification.enabled | true | When true, allows Kyuubi
engine to judge this SQL's classification and set it into sessionConf. Through
this configuration item, Spark can optimizing configuration dynamic. | 1.4.0
+spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi
engine to judge this SQL's classification and set
`spark.sql.analyzer.classification` back into sessionConf. Through this
configuration item, Spark can optimizing configuration dynamic. | 1.4.0
spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we
will follow target table properties to insert zorder or not. The key properties
are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert
zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we
will zorder by these cols. | 1.4.0
spark.sql.watchdog.maxHivePartitions | none | Add maxHivePartitions Strategy
to avoid scan excessive hive partitions on partitioned table, it's optional
that works with defined. | 1.4.0