This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 47b0fec [KYUUBI #1867] Support `PlanOnlyStatement` operation like
Spark SQL engine
47b0fec is described below
commit 47b0fecd6add7d49fbcffacf820688695b27e89a
Author: SteNicholas <[email protected]>
AuthorDate: Wed Feb 16 16:38:52 2022 +0800
[KYUUBI #1867] Support `PlanOnlyStatement` operation like Spark SQL engine
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
Support `PlanOnlyStatement` operation like Spark SQL engine.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1891 from SteNicholas/KYUUBI-1867.
Closes #1867
beacaf76 [SteNicholas] [KYUUBI #1867] Bugfix FlinkSQLSessionManager should
open Flink session
f3797076 [SteNicholas] [KYUUBI #1867] Support PlanOnlyStatement operation
like Spark SQL engine
Authored-by: SteNicholas <[email protected]>
Signed-off-by: yanghua <[email protected]>
---
.../engine/flink/operation/ExecuteStatement.scala | 72 ++-----------------
.../flink/operation/FlinkSQLOperationManager.scala | 19 ++++-
.../engine/flink/operation/PlanOnlyStatement.scala | 75 +++++++++++++++++++
.../kyuubi/engine/flink/result/ResultSetUtil.scala | 84 ++++++++++++++++++++++
.../flink/session/FlinkSQLSessionManager.scala | 14 +++-
.../engine/flink/session/FlinkSessionImpl.scala | 17 ++++-
.../flink/operation/FlinkOperationSuite.scala | 5 +-
.../flink/operation/PlanOnlyOperationSuite.scala | 68 ++++++++++++++++++
.../spark/operation/SparkSQLOperationManager.scala | 5 +-
.../kyuubi/operation/PlanOnlyOperationSuite.scala | 4 +-
10 files changed, 287 insertions(+), 76 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 06ae1fa..855f803 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.flink.operation
-import java.util
import java.util.concurrent.{RejectedExecutionException,
ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -25,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider,
JaninoRelMetadataProvider, RelMetadataQueryBase}
-import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
@@ -107,10 +105,13 @@ class ExecuteStatement(
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation =>
runQueryOperation(queryOperation)
- case setOperation: SetOperation => runSetOperation(setOperation)
- case resetOperation: ResetOperation =>
runResetOperation(resetOperation)
+ case setOperation: SetOperation =>
+ resultSet = ResultSetUtil.runSetOperation(setOperation, executor,
sessionId)
+ case resetOperation: ResetOperation =>
+ resultSet = ResultSetUtil.runResetOperation(resetOperation,
executor, sessionId)
case operation: Operation => runOperation(operation)
}
+ setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
@@ -148,8 +149,6 @@ class ExecuteStatement(
.columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row])
.build
- setState(OperationState.FINISHED)
-
} finally {
if (resultId != null) {
cleanupQueryResult(resultId)
@@ -157,69 +156,10 @@ class ExecuteStatement(
}
}
- private def runSetOperation(setOperation: SetOperation): Unit = {
- if (setOperation.getKey.isPresent) {
- val key: String = setOperation.getKey.get.trim
-
- if (setOperation.getValue.isPresent) {
- val newValue: String = setOperation.getValue.get.trim
- executor.setSessionProperty(sessionId, key, newValue)
- }
-
- val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
- resultSet = ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(Array(Row.of(key, value)))
- .build
- } else {
- // show all properties if set without key
- val properties: util.Map[String, String] =
executor.getSessionConfigMap(sessionId)
-
- val entries = ArrayBuffer.empty[Row]
- properties.forEach((key, value) => entries.append(Row.of(key, value)))
-
- if (entries.nonEmpty) {
- val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
- resultSet = ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(prettyEntries.toArray)
- .build
- } else {
- resultSet = ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(Array[Row]())
- .build
- }
- }
- setState(OperationState.FINISHED)
- }
-
- private def runResetOperation(resetOperation: ResetOperation): Unit = {
- if (resetOperation.getKey.isPresent) {
- // reset the given property
- executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
- } else {
- // reset all properties
- executor.resetSessionProperties(sessionId)
- }
- resultSet = ResultSetUtil.successResultSet
- setState(OperationState.FINISHED)
- }
-
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
result.await()
resultSet = ResultSet.fromTableResult(result)
- setState(OperationState.FINISHED)
}
private def cleanupQueryResult(resultId: String): Unit = {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 37e8ec8..2ec7ed3 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -18,22 +18,37 @@
package org.apache.kyuubi.engine.flink.operation
import java.util
+import java.util.Locale
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.engine.flink.result.Constants
+import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.Session
class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManager") {
+ private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
+
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
- val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
+ val flinkSession = session.asInstanceOf[FlinkSessionImpl]
+ val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
+ OPERATION_PLAN_ONLY.key,
+ operationModeDefault)
+ val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
+ case NONE =>
+ new ExecuteStatement(session, statement, runAsync, queryTimeout)
+ case mode =>
+ new PlanOnlyStatement(session, statement, mode)
+ }
addOperation(op)
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
new file mode 100644
index 0000000..8d47514
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.engine.flink.result.ResultSetUtil
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+/**
+ * Perform the statement parsing, analyzing or optimizing only without
executing it
+ */
+class PlanOnlyStatement(
+ session: Session,
+ override val statement: String,
+ mode: OperationMode)
+ extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
+
+ private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+ override protected def runInternal(): Unit = {
+ try {
+ val operation = executor.parseStatement(sessionId, statement)
+ operation match {
+ case setOperation: SetOperation =>
+ resultSet = ResultSetUtil.runSetOperation(setOperation, executor,
sessionId)
+ case resetOperation: ResetOperation =>
+ resultSet = ResultSetUtil.runResetOperation(resetOperation,
executor, sessionId)
+ case _ => explainOperation(statement)
+ }
+ } catch {
+ onError()
+ }
+ }
+
+ private def explainOperation(statement: String): Unit = {
+ val tableEnv: TableEnvironment =
sessionContext.getExecutionContext.getTableEnvironment
+ mode match {
+ case PARSE =>
+ val sqlPlan = tableEnv.explainSql(statement)
+ resultSet =
+ ResultSetUtil.stringListToResultSet(
+ List(sqlPlan.split(System.lineSeparator()).apply(1)),
+ "plan")
+ case _ =>
+ throw KyuubiSQLException(
+ s"""
+ |The operation mode ${mode.toString} doesn't support in Flink SQL
engine.
+ |Flink only supports the AST and the execution plan of the sql
statement.
+ |Flink engine will support EXECUTION operation plan mode in
future.
+ |""".stripMargin)
+ }
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index ded271c..e7c8cea 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,9 +17,15 @@
package org.apache.kyuubi.engine.flink.result;
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.client.gateway.Executor
+import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
import org.apache.flink.types.Row
/** Utility object for building ResultSet. */
@@ -54,4 +60,82 @@ object ResultSetUtil {
.columns(Column.physical("result", DataTypes.STRING))
.data(Array[Row](Row.of("OK")))
.build
+
+ /**
+ * Runs a SetOperation with executor. Returns when SetOperation is executed
successfully.
+ *
+ * @param setOperation Set operation.
+ * @param executor A gateway for communicating with Flink and other external
systems.
+ * @param sessionId Id of the session.
+ * @return A ResultSet of SetOperation execution.
+ */
+ def runSetOperation(
+ setOperation: SetOperation,
+ executor: Executor,
+ sessionId: String): ResultSet = {
+ if (setOperation.getKey.isPresent) {
+ val key: String = setOperation.getKey.get.trim
+
+ if (setOperation.getValue.isPresent) {
+ val newValue: String = setOperation.getValue.get.trim
+ executor.setSessionProperty(sessionId, key, newValue)
+ }
+
+ val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(
+ Column.physical("key", DataTypes.STRING()),
+ Column.physical("value", DataTypes.STRING()))
+ .data(Array(Row.of(key, value)))
+ .build
+ } else {
+ // show all properties if set without key
+ val properties: util.Map[String, String] =
executor.getSessionConfigMap(sessionId)
+
+ val entries = ArrayBuffer.empty[Row]
+ properties.forEach((key, value) => entries.append(Row.of(key, value)))
+
+ if (entries.nonEmpty) {
+ val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(
+ Column.physical("key", DataTypes.STRING()),
+ Column.physical("value", DataTypes.STRING()))
+ .data(prettyEntries.toArray)
+ .build
+ } else {
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(
+ Column.physical("key", DataTypes.STRING()),
+ Column.physical("value", DataTypes.STRING()))
+ .data(Array[Row]())
+ .build
+ }
+ }
+ }
+
+ /**
+ * Runs a ResetOperation with executor. Returns when ResetOperation is
executed successfully.
+ *
+ * @param resetOperation Reset operation.
+ * @param executor A gateway for communicating with Flink and other external
systems.
+ * @param sessionId Id of the session.
+ * @return A ResultSet of ResetOperation execution.
+ */
+ def runResetOperation(
+ resetOperation: ResetOperation,
+ executor: Executor,
+ sessionId: String): ResultSet = {
+ if (resetOperation.getKey.isPresent) {
+ // reset the given property
+ executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
+ } else {
+ // reset all properties
+ executor.resetSessionProperties(sessionId)
+ }
+ successResultSet
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 62a1e5e..a14e4c0 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,6 +21,7 @@ import
org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
@@ -61,8 +62,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
sessionHandle,
sessionContext)
- setSession(sessionHandle, sessionImpl)
- sessionHandle
+ try {
+ sessionImpl.open()
+ setSession(sessionHandle, sessionImpl)
+ info(s"$user's session with $sessionHandle is opened, current opening
sessions" +
+ s" $getOpenSessionCount")
+ sessionHandle
+ } catch {
+ case e: Exception =>
+ sessionImpl.close()
+ throw KyuubiSQLException(e)
+ }
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index ae6b9d5..9972642 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.flink.session
-import org.apache.flink.table.client.gateway.Executor
+import org.apache.flink.table.client.gateway.{Executor, SqlExecutionException}
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -37,4 +37,19 @@ class FlinkSessionImpl(
def executor: Executor =
sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
def sessionId: String = handle.identifier.toString
+
+ private def setModifiableConfig(key: String, value: String): Unit = {
+ try {
+ sessionContext.set(key, value)
+ } catch {
+ case e: SqlExecutionException => warn(e.getMessage)
+ }
+ }
+
+ override def open(): Unit = {
+ normalizedConf.foreach {
+ case (key, value) => setModifiableConfig(key, value)
+ }
+ super.open()
+ }
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 197cbcb..9039c19 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -26,6 +26,8 @@ import
org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsRe
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -33,7 +35,8 @@ import
org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.service.ServiceState._
class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
- override def withKyuubiConf: Map[String, String] = Map()
+ override def withKyuubiConf: Map[String, String] =
+ Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> NONE.toString)
override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
new file mode 100644
index 0000000..27abce0
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.sql.Statement
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class PlanOnlyOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper {
+
+ override def withKyuubiConf: Map[String, String] =
+ Map(
+ KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user",
+ KyuubiConf.OPERATION_PLAN_ONLY.key -> PARSE.toString,
+ KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only")
+
+ override protected def jdbcUrl: String =
+ s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+
+ test("Plan only operation with system defaults") {
+ withJdbcStatement() { statement =>
+ testPlanOnlyStatement(statement)
+ }
+ }
+
+ test("Plan only operation with session conf") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
ANALYZE.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ val exceptionMsg = intercept[Exception](statement.executeQuery("select
1")).getMessage
+ assert(exceptionMsg.contains(
+ s"The operation mode ${ANALYZE.toString} doesn't support in Flink
SQL engine."))
+ }
+ }
+ }
+
+ test("Plan only operation with set command") {
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
ANALYZE.toString))(Map.empty) {
+ withJdbcStatement() { statement =>
+ statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
+ testPlanOnlyStatement(statement)
+ }
+ }
+ }
+
+ private def testPlanOnlyStatement(statement: Statement): Unit = {
+ val resultSet = statement.executeQuery("select 1")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "LogicalProject(EXPR$0=[1])")
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index ade5b4e..c3f3b74 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -52,8 +52,9 @@ class SparkSQLOperationManager private (name: String) extends
OperationManager(n
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = session.asInstanceOf[SparkSessionImpl].spark
- val lang = confOverlay.get(OPERATION_LANGUAGE.key)
- .getOrElse(spark.conf.get(OPERATION_LANGUAGE.key,
operationLanguageDefault))
+ val lang = confOverlay.getOrElse(
+ OPERATION_LANGUAGE.key,
+ spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault))
val operation =
OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
case OperationLanguages.SQL =>
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 36af242..8204b60 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -40,7 +40,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with
HiveJDBCTestHelper {
}
}
- test("KYUUBI #1059: Plan only operation with with session conf") {
+ test("KYUUBI #1059: Plan only operation with session conf") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
"analyze"))(Map.empty) {
withJdbcStatement() { statement =>
val set = statement.executeQuery("select 1 where true")
@@ -51,7 +51,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with
HiveJDBCTestHelper {
}
}
- test("KYUUBI #1059: Plan only operation with with set command") {
+ test("KYUUBI #1059: Plan only operation with set command") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key ->
"analyze"))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")