This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 58db6ca22 [KYUUBI #5877][FOLLOWUP] Add spark output mode to better
support PySpark notebook
58db6ca22 is described below
commit 58db6ca2256d9d018b3fa58bc894fcd281b2c443
Author: Fei Wang <[email protected]>
AuthorDate: Thu Dec 21 21:26:35 2023 -0800
[KYUUBI #5877][FOLLOWUP] Add spark output mode to better support PySpark
notebook
# :mag: Description
## Issue References ๐
This pull request fixes #
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [ ] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5898 from turboFei/notebook_mode.
Closes #5877
7f1c607b9 [Fei Wang] PySpark
644d036bc [Fei Wang] docs
7c68b7742 [Fei Wang] add option to support notebook well
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
---
docs/configuration/settings.md | 1 +
.../engine/spark/operation/ExecutePython.scala | 27 +++++++++++++---------
.../engine/spark/operation/SparkOperation.scala | 5 +++-
.../org/apache/kyuubi/config/KyuubiConf.scala | 14 +++++++++++
4 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index b42bdcf10..07ae62771 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -181,6 +181,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.single.spark.session | false
| When set to true, this engine is running in a single session mode.
All the JDBC/ODBC connections share the temporary views, function registries,
SQL configuration and the current database.
[...]
| kyuubi.engine.spark.event.loggers | SPARK
| A comma-separated list of engine loggers, where
engine/session/operation etc events go.<ul> <li>SPARK: the events will be
written to the Spark listener bus.</li> <li>JSON: the events will be written to
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be
done</li> <li>CUSTOM: to be done.</li></ul>
[...]
| kyuubi.engine.spark.initialize.sql | SHOW DATABASES
| The initialize sql for Spark engine. It fallback to
`kyuubi.engine.initialize.sql`.
[...]
+| kyuubi.engine.spark.output.mode | AUTO
| The output mode of Spark engine: <ul> <li>AUTO: For PySpark, the
extracted `text/plain` from python response as output.</li> <li>NOTEBOOK: For
PySpark, the original python response as output.</li></ul>
[...]
| kyuubi.engine.spark.python.env.archive | <undefined>
| Portable Python env archive used for Spark engine Python language
mode.
[...]
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python
| The Python exec path under the Python env archive.
[...]
| kyuubi.engine.spark.python.home.archive | <undefined>
| Spark archive containing $SPARK_HOME/python directory, which is used
to init session Python worker for Python language mode.
[...]
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 7009bf9e0..f60b1d4c8 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE,
ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE,
ENGINE_SPARK_PYTHON_MAGIC_ENABLED}
+import org.apache.kyuubi.config.KyuubiConf.EngineSparkOutputMode.{AUTO,
EngineSparkOutputMode, NOTEBOOK}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY,
KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.util.JsonUtils
@@ -86,7 +87,7 @@ class ExecutePython(
val response = worker.runCode(statement)
val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
- val output = response.map(_.content.getOutput()).getOrElse("")
+ val output =
response.map(_.content.getOutput(outputMode)).getOrElse("")
val ename = response.map(_.content.getEname()).getOrElse("")
val evalue = response.map(_.content.getEvalue()).getOrElse("")
val traceback =
response.map(_.content.getTraceback()).getOrElse(Seq.empty)
@@ -403,18 +404,22 @@ case class PythonResponseContent(
evalue: String,
traceback: Seq[String],
status: String) {
- def getOutput(): String = {
+ def getOutput(outputMode: EngineSparkOutputMode): String = {
if (data == null) return ""
- // If data does not contains field other than `test/plain`, keep backward
compatibility,
- // otherwise, return all the data.
- if (data.filterNot(_._1 == "text/plain").isEmpty) {
- data.get("text/plain").map {
- case str: String => str
- case obj => JsonUtils.toJson(obj)
- }.getOrElse("")
- } else {
- JsonUtils.toJson(data)
+ outputMode match {
+ case AUTO =>
+ // If data does not contains field other than `test/plain`, keep
backward compatibility,
+ // otherwise, return all the data.
+ if (data.filterNot(_._1 == "text/plain").isEmpty) {
+ data.get("text/plain").map {
+ case str: String => str
+ case obj => JsonUtils.toJson(obj)
+ }.getOrElse("")
+ } else {
+ JsonUtils.toJson(data)
+ }
+ case NOTEBOOK => JsonUtils.toJson(data)
}
}
def getEname(): String = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 1d271cfce..88ebc306b 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{BinaryType, StructField,
StructType}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import
org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING,
OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE,
SESSION_USER_SIGN_ENABLED}
+import
org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING,
ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode,
OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE,
SESSION_USER_SIGN_ENABLED}
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY,
KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf,
SPARK_SCHEDULER_POOL_KEY}
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
@@ -82,6 +82,9 @@ abstract class SparkOperation(session: Session)
protected def supportProgress: Boolean = false
+ protected def outputMode: EngineSparkOutputMode.EngineSparkOutputMode =
+ EngineSparkOutputMode.withName(getSessionConf(ENGINE_SPARK_OUTPUT_MODE,
spark))
+
override def getStatus: OperationStatus = {
if (progressEnable && supportProgress) {
val progressMonitor = new SparkProgressMonitor(spark, statementId)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index fd01e718c..f9bca31ce 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3258,6 +3258,20 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
+ object EngineSparkOutputMode extends Enumeration {
+ type EngineSparkOutputMode = Value
+ val AUTO, NOTEBOOK = Value
+ }
+
+ val ENGINE_SPARK_OUTPUT_MODE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.spark.output.mode")
+ .doc("The output mode of Spark engine: <ul>" +
+ " <li>AUTO: For PySpark, the extracted `text/plain` from python
response as output.</li>" +
+ " <li>NOTEBOOK: For PySpark, the original python response as
output.</li></ul>")
+ .version("1.9.0")
+ .stringConf
+ .createWithDefault(EngineSparkOutputMode.AUTO.toString)
+
val ENGINE_SPARK_REGISTER_ATTRIBUTES: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.spark.register.attributes")
.internal