This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 58db6ca22 [KYUUBI #5877][FOLLOWUP] Add spark output mode to better 
support PySpark notebook
58db6ca22 is described below

commit 58db6ca2256d9d018b3fa58bc894fcd281b2c443
Author: Fei Wang <[email protected]>
AuthorDate: Thu Dec 21 21:26:35 2023 -0800

    [KYUUBI #5877][FOLLOWUP] Add spark output mode to better support PySpark 
notebook
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [ ] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [ ] I have performed a self-review
    - [ ] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [ ] New and existing unit tests pass locally with my changes
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [ ] Pull request title is okay.
    - [ ] No license issues.
    - [ ] Milestone correctly set?
    - [ ] Test coverage is ok
    - [ ] Assignees are selected.
    - [ ] Minimum number of approvals
    - [ ] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5898 from turboFei/notebook_mode.
    
    Closes #5877
    
    7f1c607b9 [Fei Wang] PySpark
    644d036bc [Fei Wang] docs
    7c68b7742 [Fei Wang] add option to support notebook well
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: Fei Wang <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 .../engine/spark/operation/ExecutePython.scala     | 27 +++++++++++++---------
 .../engine/spark/operation/SparkOperation.scala    |  5 +++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 14 +++++++++++
 4 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index b42bdcf10..07ae62771 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -181,6 +181,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.single.spark.session                       | false             
        | When set to true, this engine is running in a single session mode. 
All the JDBC/ODBC connections share the temporary views, function registries, 
SQL configuration and the current database.                                     
                                                                                
                                                                                
                   [...]
 | kyuubi.engine.spark.event.loggers                        | SPARK             
        | A comma-separated list of engine loggers, where 
engine/session/operation etc events go.<ul> <li>SPARK: the events will be 
written to the Spark listener bus.</li> <li>JSON: the events will be written to 
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be 
done</li> <li>CUSTOM: to be done.</li></ul>                                     
                                                   [...]
 | kyuubi.engine.spark.initialize.sql                       | SHOW DATABASES    
        | The initialize sql for Spark engine. It fallback to 
`kyuubi.engine.initialize.sql`.                                                 
                                                                                
                                                                                
                                                                                
                                [...]
+| kyuubi.engine.spark.output.mode                          | AUTO              
        | The output mode of Spark engine: <ul> <li>AUTO: For PySpark, the 
extracted `text/plain` from python response as output.</li> <li>NOTEBOOK: For 
PySpark, the original python response as output.</li></ul>                      
                                                                                
                                                                                
                     [...]
 | kyuubi.engine.spark.python.env.archive                   | &lt;undefined&gt; 
        | Portable Python env archive used for Spark engine Python language 
mode.                                                                           
                                                                                
                                                                                
                                                                                
                  [...]
 | kyuubi.engine.spark.python.env.archive.exec.path         | bin/python        
        | The Python exec path under the Python env archive.                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.spark.python.home.archive                  | &lt;undefined&gt; 
        | Spark archive containing $SPARK_HOME/python directory, which is used 
to init session Python worker for Python language mode.                         
                                                                                
                                                                                
                                                                                
               [...]
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 7009bf9e0..f60b1d4c8 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, 
ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE, 
ENGINE_SPARK_PYTHON_MAGIC_ENABLED}
+import org.apache.kyuubi.config.KyuubiConf.EngineSparkOutputMode.{AUTO, 
EngineSparkOutputMode, NOTEBOOK}
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, 
KYUUBI_STATEMENT_ID_KEY}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.util.JsonUtils
@@ -86,7 +87,7 @@ class ExecutePython(
         val response = worker.runCode(statement)
         val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
         if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
-          val output = response.map(_.content.getOutput()).getOrElse("")
+          val output = 
response.map(_.content.getOutput(outputMode)).getOrElse("")
           val ename = response.map(_.content.getEname()).getOrElse("")
           val evalue = response.map(_.content.getEvalue()).getOrElse("")
           val traceback = 
response.map(_.content.getTraceback()).getOrElse(Seq.empty)
@@ -403,18 +404,22 @@ case class PythonResponseContent(
     evalue: String,
     traceback: Seq[String],
     status: String) {
-  def getOutput(): String = {
+  def getOutput(outputMode: EngineSparkOutputMode): String = {
     if (data == null) return ""
 
-    // If data does not contains field other than `test/plain`, keep backward 
compatibility,
-    // otherwise, return all the data.
-    if (data.filterNot(_._1 == "text/plain").isEmpty) {
-      data.get("text/plain").map {
-        case str: String => str
-        case obj => JsonUtils.toJson(obj)
-      }.getOrElse("")
-    } else {
-      JsonUtils.toJson(data)
+    outputMode match {
+      case AUTO =>
+        // If data does not contains field other than `test/plain`, keep 
backward compatibility,
+        // otherwise, return all the data.
+        if (data.filterNot(_._1 == "text/plain").isEmpty) {
+          data.get("text/plain").map {
+            case str: String => str
+            case obj => JsonUtils.toJson(obj)
+          }.getOrElse("")
+        } else {
+          JsonUtils.toJson(data)
+        }
+      case NOTEBOOK => JsonUtils.toJson(data)
     }
   }
   def getEname(): String = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 1d271cfce..88ebc306b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{BinaryType, StructField, 
StructType}
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import 
org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, 
OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, 
SESSION_USER_SIGN_ENABLED}
+import 
org.apache.kyuubi.config.KyuubiConf.{ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING, 
ENGINE_SPARK_OUTPUT_MODE, EngineSparkOutputMode, 
OPERATION_SPARK_LISTENER_ENABLED, SESSION_PROGRESS_ENABLE, 
SESSION_USER_SIGN_ENABLED}
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_SIGN_PUBLICKEY, 
KYUUBI_SESSION_USER_KEY, KYUUBI_SESSION_USER_SIGN, KYUUBI_STATEMENT_ID_KEY}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.{getSessionConf, 
SPARK_SCHEDULER_POOL_KEY}
 import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
@@ -82,6 +82,9 @@ abstract class SparkOperation(session: Session)
 
   protected def supportProgress: Boolean = false
 
+  protected def outputMode: EngineSparkOutputMode.EngineSparkOutputMode =
+    EngineSparkOutputMode.withName(getSessionConf(ENGINE_SPARK_OUTPUT_MODE, 
spark))
+
   override def getStatus: OperationStatus = {
     if (progressEnable && supportProgress) {
       val progressMonitor = new SparkProgressMonitor(spark, statementId)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index fd01e718c..f9bca31ce 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3258,6 +3258,20 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(true)
 
+  object EngineSparkOutputMode extends Enumeration {
+    type EngineSparkOutputMode = Value
+    val AUTO, NOTEBOOK = Value
+  }
+
+  val ENGINE_SPARK_OUTPUT_MODE: ConfigEntry[String] =
+    buildConf("kyuubi.engine.spark.output.mode")
+      .doc("The output mode of Spark engine: <ul>" +
+        " <li>AUTO: For PySpark, the extracted `text/plain` from python 
response as output.</li>" +
+        " <li>NOTEBOOK: For PySpark, the original python response as 
output.</li></ul>")
+      .version("1.9.0")
+      .stringConf
+      .createWithDefault(EngineSparkOutputMode.AUTO.toString)
+
   val ENGINE_SPARK_REGISTER_ATTRIBUTES: ConfigEntry[Seq[String]] =
     buildConf("kyuubi.engine.spark.register.attributes")
       .internal

Reply via email to