This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 29312bc0c971 [SPARK-49824][SS][CONNECT] Improve logging in 
SparkConnectStreamingQueryCache
29312bc0c971 is described below

commit 29312bc0c971c7729dc6dd73e641cd9fd369ed0f
Author: Wei Liu <[email protected]>
AuthorDate: Fri Oct 4 08:55:33 2024 +0900

    [SPARK-49824][SS][CONNECT] Improve logging in 
SparkConnectStreamingQueryCache
    
    ### What changes were proposed in this pull request?
    
    The query key in the cache is <id, runId> but in the log only the id is 
logged. A query could have the same id but different runid, we need to log both 
id and runid to make it less confusing.
    
    ### Why are the changes needed?
    
    Debug improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manual check log
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48293 from WweiL/listener-cache-improvement.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../service/SparkConnectStreamingQueryCache.scala      | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index 8241672d5107..48492bac6234 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -29,7 +29,7 @@ import scala.concurrent.duration.{Duration, DurationInt, 
FiniteDuration}
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE, 
QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID}
+import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE, 
QUERY_CACHE_VALUE, QUERY_ID, QUERY_RUN_ID, SESSION_ID}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
@@ -158,7 +158,8 @@ private[connect] class SparkConnectStreamingQueryCache(
       if (v.userId.equals(sessionHolder.userId) && 
v.sessionId.equals(sessionHolder.sessionId)) {
         if (v.query.isActive && 
Option(v.session.streams.get(k.queryId)).nonEmpty) {
           logInfo(
-            log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " +
+            log"Stopping the query with id: ${MDC(QUERY_ID, k.queryId)} " +
+              log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " +
               log"since the session has timed out")
           try {
             if (blocking) {
@@ -170,7 +171,8 @@ private[connect] class SparkConnectStreamingQueryCache(
           } catch {
             case NonFatal(ex) =>
               logWarning(
-                log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " +
+                log"Failed to stop the with id: ${MDC(QUERY_ID, k.queryId)} " +
+                  log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " +
                   log"Error is ignored.",
                 ex)
           }
@@ -238,17 +240,20 @@ private[connect] class SparkConnectStreamingQueryCache(
 
       for ((k, v) <- queryCache) {
         val id = k.queryId
+        val runId = k.runId
         v.expiresAtMs match {
 
           case Some(ts) if nowMs >= ts => // Expired. Drop references.
             logInfo(
-              log"Removing references for ${MDC(QUERY_ID, id)} in " +
+              log"Removing references for id: ${MDC(QUERY_ID, id)} " +
+                log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
                 log"session ${MDC(SESSION_ID, v.sessionId)} after expiry 
period")
             queryCache.remove(k)
 
           case Some(_) => // Inactive query waiting for expiration. Do nothing.
             logInfo(
-              log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " +
+              log"Waiting for the expiration for id: ${MDC(QUERY_ID, id)} " +
+                log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
                 log"session ${MDC(SESSION_ID, v.sessionId)}")
 
           case None => // Active query, check if it is stopped. Enable timeout 
if it is stopped.
@@ -256,7 +261,8 @@ private[connect] class SparkConnectStreamingQueryCache(
 
             if (!isActive) {
               logInfo(
-                log"Marking query ${MDC(QUERY_ID, id)} in " +
+                log"Marking query id: ${MDC(QUERY_ID, id)} " +
+                  log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
                   log"session ${MDC(SESSION_ID, v.sessionId)} inactive.")
               val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis
               queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs)))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to