This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 514a6a961 [KYUUBI #4415][FOLLOWUP] Align the operation handle in
server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
514a6a961 is described below
commit 514a6a961ade407c96059e1eec8eeb19253a3d5c
Author: fwang12 <[email protected]>
AuthorDate: Tue May 16 20:22:26 2023 +0800
[KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for
ExecuteScala, ExecutePython and PlanOnlyStatement
### _Why are the changes needed?_
As title.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4832 from turboFei/scala_python_handle.
Closes #4415
a5a44dfa0 [fwang12] ut
eaf7f004f [fwang12] ut
c8d7a5c82 [fwang12] save
Authored-by: fwang12 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/spark/operation/ExecutePython.scala | 5 +++--
.../engine/spark/operation/ExecuteScala.scala | 5 +++--
.../engine/spark/operation/PlanOnlyStatement.scala | 5 +++--
.../spark/operation/SparkSQLOperationManager.scala | 6 ++---
.../operation/KyuubiOperationPerUserSuite.scala | 26 +++++++++++++++++-----
5 files changed, 33 insertions(+), 14 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 0d01348a7..badd83530 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE,
ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY,
KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle,
OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -49,7 +49,8 @@ class ExecutePython(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
- worker: SessionPythonWorker) extends SparkOperation(session) {
+ worker: SessionPythonWorker,
+ override protected val handle: OperationHandle) extends
SparkOperation(session) {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index 24e17d281..691c4fb32 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle,
OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -51,7 +51,8 @@ class ExecuteScala(
repl: KyuubiSparkILoop,
override val statement: String,
override val shouldRunAsync: Boolean,
- queryTimeout: Long)
+ queryTimeout: Long,
+ override protected val handle: OperationHandle)
extends SparkOperation(session) {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index c5a7679d6..726b7b0c2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES,
OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator,
ExecutionMode, IterableFetchIterator, JsonStyle, OptimizeMode,
OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode,
PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator,
ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode,
OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode,
PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError,
unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError,
unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
@@ -36,7 +36,8 @@ import org.apache.kyuubi.session.Session
class PlanOnlyStatement(
session: Session,
override val statement: String,
- mode: PlanOnlyMode)
+ mode: PlanOnlyMode,
+ override protected val handle: OperationHandle)
extends SparkOperation(session) {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index 8fd58b338..cb444aa77 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -106,18 +106,18 @@ class SparkSQLOperationManager private (name: String)
extends OperationManager(n
opHandle)
}
case mode =>
- new PlanOnlyStatement(session, statement, mode)
+ new PlanOnlyStatement(session, statement, mode, opHandle)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle,
KyuubiSparkILoop(spark))
- new ExecuteScala(session, repl, statement, runAsync, queryTimeout)
+ new ExecuteScala(session, repl, statement, runAsync, queryTimeout,
opHandle)
case OperationLanguages.PYTHON =>
try {
ExecutePython.init()
val worker = sessionToPythonProcess.getOrElseUpdate(
session.handle,
ExecutePython.createSessionPythonWorker(spark, session))
- new ExecutePython(session, statement, runAsync, queryTimeout,
worker)
+ new ExecutePython(session, statement, runAsync, queryTimeout,
worker, opHandle)
} catch {
case e: Throwable =>
spark.conf.set(OPERATION_LANGUAGE.key,
OperationLanguages.SQL.toString)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index f86d75247..2ae2340b2 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -330,11 +330,27 @@ class KyuubiOperationPerUserSuite
eventually(timeout(10.seconds)) {
assert(session.handle ===
SessionHandle.apply(session.client.remoteSessionHandle))
}
- val opHandle = session.executeStatement("SELECT engine_id()",
Map.empty, true, 0L)
- eventually(timeout(10.seconds)) {
- val operation = session.sessionManager.operationManager.getOperation(
- opHandle).asInstanceOf[KyuubiOperation]
- assert(opHandle == OperationHandle.apply(operation.remoteOpHandle()))
+
+ def checkOpHandleAlign(statement: String, confOverlay: Map[String,
String]): Unit = {
+ val opHandle = session.executeStatement(statement, confOverlay,
true, 0L)
+ eventually(timeout(10.seconds)) {
+ val operation =
session.sessionManager.operationManager.getOperation(
+ opHandle).asInstanceOf[KyuubiOperation]
+ assert(opHandle ==
OperationHandle.apply(operation.remoteOpHandle()))
+ }
+ }
+
+ val statement = "SELECT engine_id()"
+
+ val confOverlay = Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key ->
"PARSE")
+ checkOpHandleAlign(statement, confOverlay)
+
+ Map(
+ statement -> "SQL",
+ s"""spark.sql("$statement")""" -> "SCALA",
+ s"spark.sql('$statement')" -> "PYTHON").foreach { case (statement,
lang) =>
+ val confOverlay = Map(KyuubiConf.OPERATION_LANGUAGE.key -> lang)
+ checkOpHandleAlign(statement, confOverlay)
}
}
}