This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9269a0bfed56 [SPARK-49525][SS][CONNECT] Minor log improvement to
Server Side Streaming Query ListenerBus Listener
9269a0bfed56 is described below
commit 9269a0bfed56429e999269dfdfd89aefcb1b7261
Author: Wei Liu <[email protected]>
AuthorDate: Fri Sep 6 15:10:41 2024 +0900
[SPARK-49525][SS][CONNECT] Minor log improvement to Server Side Streaming
Query ListenerBus Listener
### What changes were proposed in this pull request?
Change the log of onQueryStarted and onQueryTerminated from `logDebug` to
`logInfo`. They would be useful for debugging as they indicate the events are
indeed being fired from the server. It won't add more logging burden as there
won't be so many queries start / end, at least much less than `onQueryProgress`.
### Why are the changes needed?
Debug improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48002 from WweiL/SPARK-49525-listener-bus-improvement.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/connect/planner/SparkConnectPlanner.scala | 13 ++++++++-----
.../connect/service/SparkConnectListenerBusListener.scala | 8 +++++---
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b6abab6ef766..bb6d52308c19 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -40,7 +40,7 @@ import
org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
import org.apache.spark.connect.proto.Parse.ParseFormat
import
org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
-import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, SESSION_ID}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile,
TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{Dataset, Encoders, ForeachWriter, Observation,
RelationalGroupedDataset, Row, SparkSession}
@@ -3052,10 +3052,13 @@ class SparkConnectPlanner(
sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove(
query.runId.toString))
queryStartedEvent.foreach {
- logDebug(
- s"[SessionId: $sessionId][UserId: $userId][operationId: " +
- s"${executeHolder.operationId}][query id: ${query.id}][query runId:
${query.runId}] " +
- s"Adding QueryStartedEvent to response")
+ logInfo(
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
+ log"[UserId: ${MDC(LogKeys.USER_ID, userId)}] " +
+ log"[operationId: ${MDC(LogKeys.OPERATION_ID,
executeHolder.operationId)}] " +
+ log"[query id: ${MDC(LogKeys.QUERY_ID, query.id)}]" +
+ log"[query runId: ${MDC(LogKeys.QUERY_RUN_ID, query.runId)}] " +
+ log"Adding QueryStartedEvent to response")
e => resultBuilder.setQueryStartedEventJson(e.json)
}
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
index 5b2205757648..7a0c067ab430 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala
@@ -160,9 +160,11 @@ private[sql] class SparkConnectListenerBusListener(
}
override def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit = {
- logDebug(
- s"[SessionId: ${sessionHolder.sessionId}][UserId:
${sessionHolder.userId}] " +
- s"Sending QueryTerminatedEvent to client, id: ${event.id} runId:
${event.runId}.")
+ logInfo(
+ log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
+ log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
+ log"Sending QueryTerminatedEvent to client, id:
${MDC(LogKeys.QUERY_ID, event.id)} " +
+ log"runId: ${MDC(LogKeys.QUERY_RUN_ID, event.runId)}.")
send(event.json, StreamingQueryEventType.QUERY_TERMINATED_EVENT)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]