This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 29312bc0c971 [SPARK-49824][SS][CONNECT] Improve logging in
SparkConnectStreamingQueryCache
29312bc0c971 is described below
commit 29312bc0c971c7729dc6dd73e641cd9fd369ed0f
Author: Wei Liu <[email protected]>
AuthorDate: Fri Oct 4 08:55:33 2024 +0900
[SPARK-49824][SS][CONNECT] Improve logging in
SparkConnectStreamingQueryCache
### What changes were proposed in this pull request?
The query key in the cache is <id, runId> but in the log only the id is
logged. A query could have the same id but different runid, we need to log both
id and runid to make it less confusing.
### Why are the changes needed?
Debug improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual check log
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48293 from WweiL/listener-cache-improvement.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../service/SparkConnectStreamingQueryCache.scala | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
index 8241672d5107..48492bac6234 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala
@@ -29,7 +29,7 @@ import scala.concurrent.duration.{Duration, DurationInt,
FiniteDuration}
import scala.util.control.NonFatal
import org.apache.spark.internal.{Logging, MDC}
-import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE,
QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID}
+import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE,
QUERY_CACHE_VALUE, QUERY_ID, QUERY_RUN_ID, SESSION_ID}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
@@ -158,7 +158,8 @@ private[connect] class SparkConnectStreamingQueryCache(
if (v.userId.equals(sessionHolder.userId) &&
v.sessionId.equals(sessionHolder.sessionId)) {
if (v.query.isActive &&
Option(v.session.streams.get(k.queryId)).nonEmpty) {
logInfo(
- log"Stopping the query with id ${MDC(QUERY_ID, k.queryId)} " +
+ log"Stopping the query with id: ${MDC(QUERY_ID, k.queryId)} " +
+ log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " +
log"since the session has timed out")
try {
if (blocking) {
@@ -170,7 +171,8 @@ private[connect] class SparkConnectStreamingQueryCache(
} catch {
case NonFatal(ex) =>
logWarning(
- log"Failed to stop the query ${MDC(QUERY_ID, k.queryId)}. " +
+ log"Failed to stop the with id: ${MDC(QUERY_ID, k.queryId)} " +
+ log"runId: ${MDC(QUERY_RUN_ID, k.runId)} " +
log"Error is ignored.",
ex)
}
@@ -238,17 +240,20 @@ private[connect] class SparkConnectStreamingQueryCache(
for ((k, v) <- queryCache) {
val id = k.queryId
+ val runId = k.runId
v.expiresAtMs match {
case Some(ts) if nowMs >= ts => // Expired. Drop references.
logInfo(
- log"Removing references for ${MDC(QUERY_ID, id)} in " +
+ log"Removing references for id: ${MDC(QUERY_ID, id)} " +
+ log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
log"session ${MDC(SESSION_ID, v.sessionId)} after expiry
period")
queryCache.remove(k)
case Some(_) => // Inactive query waiting for expiration. Do nothing.
logInfo(
- log"Waiting for the expiration for ${MDC(QUERY_ID, id)} in " +
+ log"Waiting for the expiration for id: ${MDC(QUERY_ID, id)} " +
+ log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
log"session ${MDC(SESSION_ID, v.sessionId)}")
case None => // Active query, check if it is stopped. Enable timeout
if it is stopped.
@@ -256,7 +261,8 @@ private[connect] class SparkConnectStreamingQueryCache(
if (!isActive) {
logInfo(
- log"Marking query ${MDC(QUERY_ID, id)} in " +
+ log"Marking query id: ${MDC(QUERY_ID, id)} " +
+ log"runId: ${MDC(QUERY_RUN_ID, runId)} in " +
log"session ${MDC(SESSION_ID, v.sessionId)} inactive.")
val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis
queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]