This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c6238c [KYUUBI #1919] Add more enum values for `OperationModes`
6c6238c is described below
commit 6c6238c2ae857ed96bf507c4be87da12441a1a84
Author: SteNicholas <[email protected]>
AuthorDate: Fri Feb 18 15:48:15 2022 +0800
[KYUUBI #1919] Add more enum values for `OperationModes`
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Add more enum values for `OperationModes`.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1933 from SteNicholas/KYUUBI-1919.
Closes #1919
adcc7f5e [SteNicholas] [KYUUBI #1919] Add more enum values for
OperationModes
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 2 +-
.../engine/flink/operation/PlanOnlyStatement.scala | 27 +++++------
.../flink/operation/PlanOnlyOperationSuite.scala | 36 +++++++++++++--
.../engine/spark/operation/PlanOnlyStatement.scala | 16 +++++--
.../org/apache/kyuubi/config/KyuubiConf.scala | 7 +--
.../kyuubi/operation/PlanOnlyOperationSuite.scala | 54 +++++++++++++++-------
6 files changed, 99 insertions(+), 43 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 41a1f0e..27eb352 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -294,7 +294,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.operation.interrupt.on.cancel</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>When true, all running tasks
will be interrupted if one cancels a query. When false, all running tasks will
remain until finished.</div>|<div style='width: 30pt'>boolean</div>|<div
style='width: 20pt'>1.2.0</div>
<code>kyuubi.operation.language</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>SQL</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Choose a programing language for the following
inputs <ul><li>SQL: (Default) Run all following statements as SQL queries.</li>
<li>SCALA: Run all following input a scala codes</li></ul></div>|<div
style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
<code>kyuubi.operation.log.dir.root</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>server_operation_logs</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Root directory for query
operation log at server-side.</div>|<div style='width: 30pt'>string</div>|<div
style='width: 20pt'>1.4.0</div>
-<code>kyuubi.operation.plan.only.mode</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Whether to perform the
statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query.
When it is NONE, the statement will be fully executed</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
+<code>kyuubi.operation.plan.only.mode</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Whether to perform the
statement in a PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION only way without
executing the query. When it is NONE, the statement will be fully
executed</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.4.0</div>
<code>kyuubi.operation.query.timeout</code>|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions
at server-side, take affect with client-side
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be
cancelled automatically if timeout. It's off by default, which means only
client-side take fully control whether the query should timeo [...]
<code>kyuubi.operation.scheduler.pool</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The scheduler
pool of job. Note that, this config should be used after change Spark config
spark.scheduler.mode=FAIR.</div>|<div style='width: 30pt'>string</div>|<div
style='width: 20pt'>1.1.1</div>
<code>kyuubi.operation.status.polling.max.attempts</code>|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>5</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Max attempts for long polling
asynchronous running sql query's status on raw transport failures, e.g.
TTransportException</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
index 8d47514..d8b6a42 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -37,6 +37,7 @@ class PlanOnlyStatement(
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
+ private val lineSeparator: String = System.lineSeparator()
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def runInternal(): Unit = {
@@ -56,20 +57,20 @@ class PlanOnlyStatement(
private def explainOperation(statement: String): Unit = {
val tableEnv: TableEnvironment =
sessionContext.getExecutionContext.getTableEnvironment
- mode match {
- case PARSE =>
- val sqlPlan = tableEnv.explainSql(statement)
- resultSet =
- ResultSetUtil.stringListToResultSet(
- List(sqlPlan.split(System.lineSeparator()).apply(1)),
- "plan")
+ val explainPlans =
+ tableEnv.explainSql(statement).split(s"$lineSeparator$lineSeparator")
+ val operationPlan = mode match {
+ case PARSE => explainPlans(0).split(s"== Abstract Syntax Tree
==$lineSeparator")(1)
+ case PHYSICAL =>
+ explainPlans(1).split(s"== Optimized Physical Plan
==$lineSeparator")(1)
+ case EXECUTION =>
+ explainPlans(2).split(s"== Optimized Execution Plan
==$lineSeparator")(1)
case _ =>
- throw KyuubiSQLException(
- s"""
- |The operation mode ${mode.toString} doesn't support in Flink SQL
engine.
- |Flink only supports the AST and the execution plan of the sql
statement.
- |Flink engine will support EXECUTION operation plan mode in
future.
- |""".stripMargin)
+ throw KyuubiSQLException(s"The operation mode $mode doesn't support in
Flink SQL engine.")
}
+ resultSet =
+ ResultSetUtil.stringListToResultSet(
+ List(operationPlan),
+ "plan")
}
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
index 27abce0..1ed9651 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -37,7 +37,7 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper
test("Plan only operation with system defaults") {
withJdbcStatement() { statement =>
- testPlanOnlyStatement(statement)
+ testPlanOnlyStatementWithParseMode(statement)
}
}
@@ -46,7 +46,7 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper
withJdbcStatement() { statement =>
val exceptionMsg = intercept[Exception](statement.executeQuery("select
1")).getMessage
assert(exceptionMsg.contains(
- s"The operation mode ${ANALYZE.toString} doesn't support in Flink
SQL engine."))
+ s"The operation mode $ANALYZE doesn't support in Flink SQL engine."))
}
}
}
@@ -55,14 +55,40 @@ class PlanOnlyOperationSuite extends WithFlinkSQLEngine
with HiveJDBCTestHelper
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
- testPlanOnlyStatement(statement)
+ testPlanOnlyStatementWithParseMode(statement)
}
}
}
- private def testPlanOnlyStatement(statement: Statement): Unit = {
+ test("Plan only operation with PHYSICAL mode") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
PHYSICAL.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("Calc(select=[1 AS EXPR$0])") &&
+ operationPlan.contains("Values(type=[RecordType(INTEGER ZERO)],
tuples=[[{ 0 }]])"))
+ }
+ }
+ }
+
+ test("Plan only operation with EXECUTION mode") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
EXECUTION.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("Calc(select=[1 AS EXPR$0])") &&
+ operationPlan.contains("Values(tuples=[[{ 0 }]])"))
+ }
+ }
+ }
+
+ private def testPlanOnlyStatementWithParseMode(statement: Statement): Unit =
{
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("LogicalProject(EXPR$0=[1])") &&
+ operationPlan.contains("LogicalValues(tuples=[[{ 0 }]])"))
+ }
+
+ private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1")
assert(resultSet.next())
- assert(resultSet.getString(1) === "LogicalProject(EXPR$0=[1])")
+ resultSet.getString(1)
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index a454164..304b031 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType
-import org.apache.kyuubi.config.KyuubiConf.OperationModes.{ANALYZE,
OperationMode, OPTIMIZE, PARSE}
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator,
OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -57,18 +57,24 @@ class PlanOnlyStatement(
case cmd if isSetOrReset(cmd) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())
- case otherPlan => mode match {
+ case plan => mode match {
case PARSE =>
- iter = new IterableFetchIterator(Seq(Row(otherPlan.toString())))
+ iter = new IterableFetchIterator(Seq(Row(plan.toString())))
case ANALYZE =>
- val analyzed = spark.sessionState.analyzer.execute(otherPlan)
+ val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
iter = new IterableFetchIterator(Seq(Row(analyzed.toString())))
case OPTIMIZE =>
- val analyzed = spark.sessionState.analyzer.execute(otherPlan)
+ val analyzed = spark.sessionState.analyzer.execute(plan)
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
iter = new IterableFetchIterator(Seq(Row(optimized.toString())))
+ case PHYSICAL =>
+ val physical = spark.sql(statement).queryExecution.sparkPlan
+ iter = new IterableFetchIterator(Seq(Row(physical.toString())))
+ case EXECUTION =>
+ val executed = spark.sql(statement).queryExecution.executedPlan
+ iter = new IterableFetchIterator(Seq(Row(executed.toString())))
}
}
} catch {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index f26effa..086d8f1 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1143,13 +1143,14 @@ object KyuubiConf {
object OperationModes extends Enumeration {
type OperationMode = Value
- val PARSE, ANALYZE, OPTIMIZE, NONE = Value
+ val PARSE, ANALYZE, OPTIMIZE, PHYSICAL, EXECUTION, NONE = Value
}
val OPERATION_PLAN_ONLY: ConfigEntry[String] =
buildConf("operation.plan.only.mode")
- .doc("Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE
only way without " +
- "executing the query. When it is NONE, the statement will be fully
executed")
+ .doc("Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE,
PHYSICAL, EXECUTION " +
+ "only way without executing the query. When it is NONE, the statement
will be fully " +
+ "executed")
.version("1.4.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 8204b60..22d1f4e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -17,15 +17,18 @@
package org.apache.kyuubi.operation
+import java.sql.Statement
+
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
override protected val conf: KyuubiConf = {
KyuubiConf()
.set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
- .set(KyuubiConf.OPERATION_PLAN_ONLY, "optimize")
+ .set(KyuubiConf.OPERATION_PLAN_ONLY, OPTIMIZE.toString)
.set(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key, "plan-only")
}
@@ -33,33 +36,52 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with
HiveJDBCTestHelper {
test("KYUUBI #1059: Plan only operation with system defaults") {
withJdbcStatement() { statement =>
- val set = statement.executeQuery("select 1 where true")
- assert(set.next())
- val res = set.getString(1)
- assert(res.startsWith("Project") && !res.contains("Filter"))
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("Project") &&
!operationPlan.contains("Filter"))
}
}
test("KYUUBI #1059: Plan only operation with session conf") {
- withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
"analyze"))(Map.empty) {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
- val set = statement.executeQuery("select 1 where true")
- assert(set.next())
- val res = set.getString(1)
- assert(res.startsWith("Project") && res.contains("Filter"))
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("Project") &&
operationPlan.contains("Filter"))
}
}
}
test("KYUUBI #1059: Plan only operation with set command") {
- withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
"analyze"))(Map.empty) {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
ANALYZE.toString))(Map.empty) {
withJdbcStatement() { statement =>
- statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
- val set = statement.executeQuery("select 1 where true")
- assert(set.next())
- val res = set.getString(1)
- assert(res.startsWith("'Project"))
+ statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=$PARSE")
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("'Project"))
}
}
}
+
+ test("KYUUBI #1919: Plan only operation with PHYSICAL mode") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
PHYSICAL.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("Project") &&
operationPlan.contains("Scan OneRowRelation"))
+ }
+ }
+ }
+
+ test("KYUUBI #1919: Plan only operation with EXECUTION mode") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
EXECUTION.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val operationPlan = getOperationPlanWithStatement(statement)
+ assert(operationPlan.startsWith("*(1) Project") &&
+ operationPlan.contains("*(1) Scan OneRowRelation"))
+ }
+ }
+ }
+
+ private def getOperationPlanWithStatement(statement: Statement): String = {
+ val resultSet = statement.executeQuery("select 1 where true")
+ assert(resultSet.next())
+ resultSet.getString(1)
+ }
}