This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 1c28ec7d9b5 [SPARK-43249][CONNECT] Fix missing stats for SQL Command
1c28ec7d9b5 is described below
commit 1c28ec7d9b50933107b2d2f56dd57aeeb9ec4e53
Author: Martin Grund <[email protected]>
AuthorDate: Mon Apr 24 16:33:32 2023 +0800
[SPARK-43249][CONNECT] Fix missing stats for SQL Command
### What changes were proposed in this pull request?
This patch fixes a minor issue in the code where for SQL Commands the plan
metrics are not sent to the client. In addition, it renames a method to make
clear that the method does not actually send anything but only creates the
response object.
### Why are the changes needed?
Clarity
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #40899 from grundprinzip/fix_sql_stats.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 9d050539bed10e5089c3c125887a9995693733c6)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +-
.../apache/spark/sql/connect/service/SparkConnectStreamHandler.scala | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 7650532fcf9..0f3189e6013 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1667,7 +1667,7 @@ class SparkConnectPlanner(val session: SparkSession) {
.build())
// Send Metrics
- SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df)
+
responseObserver.onNext(SparkConnectStreamHandler.createMetricsResponse(sessionId,
df))
}
private def handleRegisterUserDefinedFunction(
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 335b871d499..760ff8a64b4 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -65,7 +65,7 @@ class SparkConnectStreamHandler(responseObserver:
StreamObserver[ExecutePlanResp
SparkConnectStreamHandler.sendSchemaToResponse(request.getSessionId,
dataframe.schema))
processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
responseObserver.onNext(
- SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId,
dataframe))
+ SparkConnectStreamHandler.createMetricsResponse(request.getSessionId,
dataframe))
if (dataframe.queryExecution.observedMetrics.nonEmpty) {
responseObserver.onNext(
SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId,
dataframe))
@@ -215,7 +215,7 @@ object SparkConnectStreamHandler {
.build()
}
- def sendMetricsToResponse(sessionId: String, rows: DataFrame):
ExecutePlanResponse = {
+ def createMetricsResponse(sessionId: String, rows: DataFrame):
ExecutePlanResponse = {
// Send a last batch with the metrics
ExecutePlanResponse
.newBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]