This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9d050539bed [SPARK-43249][CONNECT] Fix missing stats for SQL Command
9d050539bed is described below
commit 9d050539bed10e5089c3c125887a9995693733c6
Author: Martin Grund <[email protected]>
AuthorDate: Mon Apr 24 16:33:32 2023 +0800
[SPARK-43249][CONNECT] Fix missing stats for SQL Command
### What changes were proposed in this pull request?
This patch fixes a minor issue in the code where for SQL Commands the plan
metrics are not sent to the client. In addition, it renames a method to make
clear that the method does not actually send anything but only creates the
response object.
### Why are the changes needed?
Clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #40899 from grundprinzip/fix_sql_stats.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +-
.../apache/spark/sql/connect/service/SparkConnectStreamHandler.scala | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 7bc67f8c398..59c407c8eea 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1829,7 +1829,7 @@ class SparkConnectPlanner(val session: SparkSession) {
.build())
// Send Metrics
- SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df)
+
responseObserver.onNext(SparkConnectStreamHandler.createMetricsResponse(sessionId,
df))
}
private def handleRegisterUserDefinedFunction(
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 845eb084598..f08dfba5e28 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -87,7 +87,7 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
SparkConnectStreamHandler.sendSchemaToResponse(request.getSessionId,
dataframe.schema))
processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
responseObserver.onNext(
- SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId,
dataframe))
+ SparkConnectStreamHandler.createMetricsResponse(request.getSessionId,
dataframe))
if (dataframe.queryExecution.observedMetrics.nonEmpty) {
responseObserver.onNext(
SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId,
dataframe))
@@ -271,7 +271,7 @@ object SparkConnectStreamHandler {
.build()
}
- def sendMetricsToResponse(sessionId: String, rows: DataFrame):
ExecutePlanResponse = {
+ def createMetricsResponse(sessionId: String, rows: DataFrame):
ExecutePlanResponse = {
// Send a last batch with the metrics
ExecutePlanResponse
.newBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]