This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new b209e5a41d [KYUUBI #6688] [SPARK] Avoid trigger execution when getting
result schema
b209e5a41d is described below
commit b209e5a41d4f6e057d6d8d1cf9942167f2424a3e
Author: wforget <[email protected]>
AuthorDate: Wed Oct 16 10:36:45 2024 +0800
[KYUUBI #6688] [SPARK] Avoid trigger execution when getting result schema
# :mag: Description
## Issue References ๐
`DataFrame.isEmpty` may trigger execution again, we should avoid it.
## Describe Your Solution ๐ง
## Types of changes :bookmark:
- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [X] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6688 from wForget/planonly_schema.
Closes #6688
265f0ec26 [wforget] fix style
d71cc4aa9 [wforget] refactor resultSchema for spark operation
0c36b3d25 [wforget] Avoid trigger execution when getting result schema
Authored-by: wforget <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
(cherry picked from commit da2401c171536fbbc564ae331eef07f972171e8e)
Signed-off-by: Bowen Liang <[email protected]>
---
.../apache/kyuubi/engine/spark/operation/ExecutePython.scala | 4 ++--
.../apache/kyuubi/engine/spark/operation/ExecuteScala.scala | 4 ++--
.../kyuubi/engine/spark/operation/ExecuteStatement.scala | 8 --------
.../kyuubi/engine/spark/operation/PlanOnlyStatement.scala | 6 +++---
.../kyuubi/engine/spark/operation/SetCurrentCatalog.scala | 6 ------
.../kyuubi/engine/spark/operation/SetCurrentDatabase.scala | 6 ------
.../apache/kyuubi/engine/spark/operation/SparkOperation.scala | 10 +++++++++-
7 files changed, 16 insertions(+), 28 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d58a22e45a..5645e5f0a6 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -58,14 +58,14 @@ class ExecutePython(
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
- if (result == null || result.schema.isEmpty) {
+ if (result == null) {
new StructType().add("output", "string")
.add("status", "string")
.add("ename", "string")
.add("evalue", "string")
.add("traceback", "array<string>")
} else {
- result.schema
+ super.resultSchema
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index 092e6e8241..e8335e549e 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -61,10 +61,10 @@ class ExecuteScala(
override protected def supportProgress: Boolean = true
override protected def resultSchema: StructType = {
- if (result == null || result.schema.isEmpty) {
+ if (result == null) {
new StructType().add("output", "string")
} else {
- result.schema
+ super.resultSchema
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 4f1eb9b53d..2e66fcff8c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
-import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import
org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP,
OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE,
OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
@@ -50,13 +49,6 @@ class ExecuteStatement(
private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFilePath: Option[Path] = None
- override protected def resultSchema: StructType = {
- if (result == null || result.schema.isEmpty) {
- new StructType().add("Result", "string")
- } else {
- result.schema
- }
- }
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index f2a6704719..6f2a5959e3 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -58,9 +58,9 @@ class PlanOnlyStatement(
override protected def resultSchema: StructType = {
if (result == null) {
new StructType().add("plan", "string")
- } else if (result.isEmpty) {
- new StructType().add("result", "string")
- } else result.schema
+ } else {
+ super.resultSchema
+ }
}
override protected def beforeRun(): Unit = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
index 88105b086a..123dac66d5 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala
@@ -17,8 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.types.StructType
-
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -29,10 +27,6 @@ class SetCurrentCatalog(session: Session, catalog: String)
extends SparkOperatio
override def getOperationLog: Option[OperationLog] = Option(operationLog)
- override protected def resultSchema: StructType = {
- new StructType()
- }
-
override protected def runInternal(): Unit = {
try {
SparkCatalogUtils.setCurrentCatalog(spark, catalog)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
index d227f5fd2a..170be4dcb4 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala
@@ -17,8 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.types.StructType
-
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -29,10 +27,6 @@ class SetCurrentDatabase(session: Session, database: String)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
- override protected def resultSchema: StructType = {
- new StructType()
- }
-
override protected def runInternal(): Unit = {
try {
spark.sessionState.catalogManager.setCurrentNamespace(Array(database))
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index b72447ae5d..da01e85a6d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -60,7 +60,15 @@ abstract class SparkOperation(session: Session)
protected var result: DataFrame = _
- protected def resultSchema: StructType
+ protected def resultSchema: StructType = {
+ if (!hasResultSet) {
+ new StructType()
+ } else if (result == null || result.schema.isEmpty) {
+ new StructType().add("Result", "string")
+ } else {
+ result.schema
+ }
+ }
override def redactedStatement: String =
redact(spark.sessionState.conf.stringRedactionPattern, statement)