This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47b0fec  [KYUUBI #1867] Support `PlanOnlyStatement` operation like 
Spark SQL engine
47b0fec is described below

commit 47b0fecd6add7d49fbcffacf820688695b27e89a
Author: SteNicholas <[email protected]>
AuthorDate: Wed Feb 16 16:38:52 2022 +0800

    [KYUUBI #1867] Support `PlanOnlyStatement` operation like Spark SQL engine
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    Support `PlanOnlyStatement` operation like Spark SQL engine.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1891 from SteNicholas/KYUUBI-1867.
    
    Closes #1867
    
    beacaf76 [SteNicholas] [KYUUBI #1867] Bugfix FlinkSQLSessionManager should 
open Flink session
    f3797076 [SteNicholas] [KYUUBI #1867] Support PlanOnlyStatement operation 
like Spark SQL engine
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: yanghua <[email protected]>
---
 .../engine/flink/operation/ExecuteStatement.scala  | 72 ++-----------------
 .../flink/operation/FlinkSQLOperationManager.scala | 19 ++++-
 .../engine/flink/operation/PlanOnlyStatement.scala | 75 +++++++++++++++++++
 .../kyuubi/engine/flink/result/ResultSetUtil.scala | 84 ++++++++++++++++++++++
 .../flink/session/FlinkSQLSessionManager.scala     | 14 +++-
 .../engine/flink/session/FlinkSessionImpl.scala    | 17 ++++-
 .../flink/operation/FlinkOperationSuite.scala      |  5 +-
 .../flink/operation/PlanOnlyOperationSuite.scala   | 68 ++++++++++++++++++
 .../spark/operation/SparkSQLOperationManager.scala |  5 +-
 .../kyuubi/operation/PlanOnlyOperationSuite.scala  |  4 +-
 10 files changed, 287 insertions(+), 76 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 06ae1fa..855f803 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,7 +17,6 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
-import java.util
 import java.util.concurrent.{RejectedExecutionException, 
ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -25,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, 
JaninoRelMetadataProvider, RelMetadataQueryBase}
-import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.client.gateway.{Executor, TypedResult}
 import org.apache.flink.table.operations.{Operation, QueryOperation}
 import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
@@ -107,10 +105,13 @@ class ExecuteStatement(
       val operation = executor.parseStatement(sessionId, statement)
       operation match {
         case queryOperation: QueryOperation => 
runQueryOperation(queryOperation)
-        case setOperation: SetOperation => runSetOperation(setOperation)
-        case resetOperation: ResetOperation => 
runResetOperation(resetOperation)
+        case setOperation: SetOperation =>
+          resultSet = ResultSetUtil.runSetOperation(setOperation, executor, 
sessionId)
+        case resetOperation: ResetOperation =>
+          resultSet = ResultSetUtil.runResetOperation(resetOperation, 
executor, sessionId)
         case operation: Operation => runOperation(operation)
       }
+      setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
     } finally {
@@ -148,8 +149,6 @@ class ExecuteStatement(
         .columns(resultDescriptor.getResultSchema.getColumns)
         .data(rows.toArray[Row])
         .build
-      setState(OperationState.FINISHED)
-
     } finally {
       if (resultId != null) {
         cleanupQueryResult(resultId)
@@ -157,69 +156,10 @@ class ExecuteStatement(
     }
   }
 
-  private def runSetOperation(setOperation: SetOperation): Unit = {
-    if (setOperation.getKey.isPresent) {
-      val key: String = setOperation.getKey.get.trim
-
-      if (setOperation.getValue.isPresent) {
-        val newValue: String = setOperation.getValue.get.trim
-        executor.setSessionProperty(sessionId, key, newValue)
-      }
-
-      val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
-      resultSet = ResultSet.builder
-        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-        .columns(
-          Column.physical("key", DataTypes.STRING()),
-          Column.physical("value", DataTypes.STRING()))
-        .data(Array(Row.of(key, value)))
-        .build
-    } else {
-      // show all properties if set without key
-      val properties: util.Map[String, String] = 
executor.getSessionConfigMap(sessionId)
-
-      val entries = ArrayBuffer.empty[Row]
-      properties.forEach((key, value) => entries.append(Row.of(key, value)))
-
-      if (entries.nonEmpty) {
-        val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
-        resultSet = ResultSet.builder
-          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-          .columns(
-            Column.physical("key", DataTypes.STRING()),
-            Column.physical("value", DataTypes.STRING()))
-          .data(prettyEntries.toArray)
-          .build
-      } else {
-        resultSet = ResultSet.builder
-          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-          .columns(
-            Column.physical("key", DataTypes.STRING()),
-            Column.physical("value", DataTypes.STRING()))
-          .data(Array[Row]())
-          .build
-      }
-    }
-    setState(OperationState.FINISHED)
-  }
-
-  private def runResetOperation(resetOperation: ResetOperation): Unit = {
-    if (resetOperation.getKey.isPresent) {
-      // reset the given property
-      executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
-    } else {
-      // reset all properties
-      executor.resetSessionProperties(sessionId)
-    }
-    resultSet = ResultSetUtil.successResultSet
-    setState(OperationState.FINISHED)
-  }
-
   private def runOperation(operation: Operation): Unit = {
     val result = executor.executeOperation(sessionId, operation)
     result.await()
     resultSet = ResultSet.fromTableResult(result)
-    setState(OperationState.FINISHED)
   }
 
   private def cleanupQueryResult(resultId: String): Unit = {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 37e8ec8..2ec7ed3 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -18,22 +18,37 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import java.util
+import java.util.Locale
 
-import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.collection.JavaConverters._
 
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
 import org.apache.kyuubi.engine.flink.result.Constants
+import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
 import org.apache.kyuubi.operation.{Operation, OperationManager}
 import org.apache.kyuubi.session.Session
 
 class FlinkSQLOperationManager extends 
OperationManager("FlinkSQLOperationManager") {
 
+  private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
+
   override def newExecuteStatementOperation(
       session: Session,
       statement: String,
       confOverlay: Map[String, String],
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
-    val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
+    val flinkSession = session.asInstanceOf[FlinkSessionImpl]
+    val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
+      OPERATION_PLAN_ONLY.key,
+      operationModeDefault)
+    val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
+      case NONE =>
+        new ExecuteStatement(session, statement, runAsync, queryTimeout)
+      case mode =>
+        new PlanOnlyStatement(session, statement, mode)
+    }
     addOperation(op)
   }
 
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
new file mode 100644
index 0000000..8d47514
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.engine.flink.result.ResultSetUtil
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+/**
+ * Perform the statement parsing, analyzing or optimizing only without 
executing it
+ */
+class PlanOnlyStatement(
+    session: Session,
+    override val statement: String,
+    mode: OperationMode)
+  extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) {
+
+  private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+  override protected def runInternal(): Unit = {
+    try {
+      val operation = executor.parseStatement(sessionId, statement)
+      operation match {
+        case setOperation: SetOperation =>
+          resultSet = ResultSetUtil.runSetOperation(setOperation, executor, 
sessionId)
+        case resetOperation: ResetOperation =>
+          resultSet = ResultSetUtil.runResetOperation(resetOperation, 
executor, sessionId)
+        case _ => explainOperation(statement)
+      }
+    } catch {
+      onError()
+    }
+  }
+
+  private def explainOperation(statement: String): Unit = {
+    val tableEnv: TableEnvironment = 
sessionContext.getExecutionContext.getTableEnvironment
+    mode match {
+      case PARSE =>
+        val sqlPlan = tableEnv.explainSql(statement)
+        resultSet =
+          ResultSetUtil.stringListToResultSet(
+            List(sqlPlan.split(System.lineSeparator()).apply(1)),
+            "plan")
+      case _ =>
+        throw KyuubiSQLException(
+          s"""
+             |The operation mode ${mode.toString} doesn't support in Flink SQL 
engine.
+             |Flink only supports the AST and the execution plan of the sql 
statement.
+             |Flink engine will support EXECUTION operation plan mode in 
future.
+             |""".stripMargin)
+    }
+  }
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index ded271c..e7c8cea 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,9 +17,15 @@
 
 package org.apache.kyuubi.engine.flink.result;
 
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.client.gateway.Executor
+import org.apache.flink.table.operations.command.{ResetOperation, SetOperation}
 import org.apache.flink.types.Row
 
 /** Utility object for building ResultSet. */
@@ -54,4 +60,82 @@ object ResultSetUtil {
       .columns(Column.physical("result", DataTypes.STRING))
       .data(Array[Row](Row.of("OK")))
       .build
+
+  /**
+   * Runs a SetOperation with executor. Returns when SetOperation is executed 
successfully.
+   *
+   * @param setOperation Set operation.
+   * @param executor A gateway for communicating with Flink and other external 
systems.
+   * @param sessionId Id of the session.
+   * @return A ResultSet of SetOperation execution.
+   */
+  def runSetOperation(
+      setOperation: SetOperation,
+      executor: Executor,
+      sessionId: String): ResultSet = {
+    if (setOperation.getKey.isPresent) {
+      val key: String = setOperation.getKey.get.trim
+
+      if (setOperation.getValue.isPresent) {
+        val newValue: String = setOperation.getValue.get.trim
+        executor.setSessionProperty(sessionId, key, newValue)
+      }
+
+      val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
+      ResultSet.builder
+        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical("key", DataTypes.STRING()),
+          Column.physical("value", DataTypes.STRING()))
+        .data(Array(Row.of(key, value)))
+        .build
+    } else {
+      // show all properties if set without key
+      val properties: util.Map[String, String] = 
executor.getSessionConfigMap(sessionId)
+
+      val entries = ArrayBuffer.empty[Row]
+      properties.forEach((key, value) => entries.append(Row.of(key, value)))
+
+      if (entries.nonEmpty) {
+        val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
+        ResultSet.builder
+          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+          .columns(
+            Column.physical("key", DataTypes.STRING()),
+            Column.physical("value", DataTypes.STRING()))
+          .data(prettyEntries.toArray)
+          .build
+      } else {
+        ResultSet.builder
+          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+          .columns(
+            Column.physical("key", DataTypes.STRING()),
+            Column.physical("value", DataTypes.STRING()))
+          .data(Array[Row]())
+          .build
+      }
+    }
+  }
+
+  /**
+   * Runs a ResetOperation with executor. Returns when ResetOperation is 
executed successfully.
+   *
+   * @param resetOperation Reset operation.
+   * @param executor A gateway for communicating with Flink and other external 
systems.
+   * @param sessionId Id of the session.
+   * @return A ResultSet of ResetOperation execution.
+   */
+  def runResetOperation(
+      resetOperation: ResetOperation,
+      executor: Executor,
+      sessionId: String): ResultSet = {
+    if (resetOperation.getKey.isPresent) {
+      // reset the given property
+      executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
+    } else {
+      // reset all properties
+      executor.resetSessionProperties(sessionId)
+    }
+    successResultSet
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 62a1e5e..a14e4c0 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -21,6 +21,7 @@ import 
org.apache.flink.table.client.gateway.context.DefaultContext
 import org.apache.flink.table.client.gateway.local.LocalExecutor
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
+import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
 import org.apache.kyuubi.session.{SessionHandle, SessionManager}
@@ -61,8 +62,17 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
       sessionHandle,
       sessionContext)
 
-    setSession(sessionHandle, sessionImpl)
-    sessionHandle
+    try {
+      sessionImpl.open()
+      setSession(sessionHandle, sessionImpl)
+      info(s"$user's session with $sessionHandle is opened, current opening 
sessions" +
+        s" $getOpenSessionCount")
+      sessionHandle
+    } catch {
+      case e: Exception =>
+        sessionImpl.close()
+        throw KyuubiSQLException(e)
+    }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index ae6b9d5..9972642 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.flink.session
 
-import org.apache.flink.table.client.gateway.Executor
+import org.apache.flink.table.client.gateway.{Executor, SqlExecutionException}
 import org.apache.flink.table.client.gateway.context.SessionContext
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
@@ -37,4 +37,19 @@ class FlinkSessionImpl(
   def executor: Executor = 
sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
 
   def sessionId: String = handle.identifier.toString
+
+  private def setModifiableConfig(key: String, value: String): Unit = {
+    try {
+      sessionContext.set(key, value)
+    } catch {
+      case e: SqlExecutionException => warn(e.getMessage)
+    }
+  }
+
+  override def open(): Unit = {
+    normalizedConf.foreach {
+      case (key, value) => setModifiableConfig(key, value)
+    }
+    super.open()
+  }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 197cbcb..9039c19 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -26,6 +26,8 @@ import 
org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsRe
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
 import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
@@ -33,7 +35,8 @@ import 
org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.service.ServiceState._
 
 class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
-  override def withKyuubiConf: Map[String, String] = Map()
+  override def withKyuubiConf: Map[String, String] =
+    Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> NONE.toString)
 
   override protected def jdbcUrl: String =
     s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
new file mode 100644
index 0000000..27abce0
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.sql.Statement
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class PlanOnlyOperationSuite extends WithFlinkSQLEngine with 
HiveJDBCTestHelper {
+
+  override def withKyuubiConf: Map[String, String] =
+    Map(
+      KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user",
+      KyuubiConf.OPERATION_PLAN_ONLY.key -> PARSE.toString,
+      KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only")
+
+  override protected def jdbcUrl: String =
+    s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+
+  test("Plan only operation with system defaults") {
+    withJdbcStatement() { statement =>
+      testPlanOnlyStatement(statement)
+    }
+  }
+
+  test("Plan only operation with session conf") {
+    withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> 
ANALYZE.toString))(Map.empty) {
+      withJdbcStatement() { statement =>
+        val exceptionMsg = intercept[Exception](statement.executeQuery("select 
1")).getMessage
+        assert(exceptionMsg.contains(
+          s"The operation mode ${ANALYZE.toString} doesn't support in Flink 
SQL engine."))
+      }
+    }
+  }
+
+  test("Plan only operation with set command") {
+    withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> 
ANALYZE.toString))(Map.empty) {
+      withJdbcStatement() { statement =>
+        statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")
+        testPlanOnlyStatement(statement)
+      }
+    }
+  }
+
+  private def testPlanOnlyStatement(statement: Statement): Unit = {
+    val resultSet = statement.executeQuery("select 1")
+    assert(resultSet.next())
+    assert(resultSet.getString(1) === "LogicalProject(EXPR$0=[1])")
+  }
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index ade5b4e..c3f3b74 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -52,8 +52,9 @@ class SparkSQLOperationManager private (name: String) extends 
OperationManager(n
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
     val spark = session.asInstanceOf[SparkSessionImpl].spark
-    val lang = confOverlay.get(OPERATION_LANGUAGE.key)
-      .getOrElse(spark.conf.get(OPERATION_LANGUAGE.key, 
operationLanguageDefault))
+    val lang = confOverlay.getOrElse(
+      OPERATION_LANGUAGE.key,
+      spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault))
     val operation =
       OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
         case OperationLanguages.SQL =>
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 36af242..8204b60 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -40,7 +40,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with 
HiveJDBCTestHelper {
     }
   }
 
-  test("KYUUBI #1059: Plan only operation with with session conf") {
+  test("KYUUBI #1059: Plan only operation with session conf") {
     withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> 
"analyze"))(Map.empty) {
       withJdbcStatement() { statement =>
         val set = statement.executeQuery("select 1 where true")
@@ -51,7 +51,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with 
HiveJDBCTestHelper {
     }
   }
 
-  test("KYUUBI #1059: Plan only operation with with set command") {
+  test("KYUUBI #1059: Plan only operation with set command") {
     withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY.key -> 
"analyze"))(Map.empty) {
       withJdbcStatement() { statement =>
         statement.execute(s"set ${KyuubiConf.OPERATION_PLAN_ONLY.key}=parse")

Reply via email to