This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7ba279a3ff76 [SPARK-50118][CONNET] Reset isolated state cache when
tasks are running
7ba279a3ff76 is described below
commit 7ba279a3ff76bd17f25035ced90ea882812dfac8
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Nov 12 08:32:59 2024 +0900
[SPARK-50118][CONNET] Reset isolated state cache when tasks are running
### What changes were proposed in this pull request?
This PR proposes to reset the expire timeout of the isolated session.
during the tasks running.
### Why are the changes needed?
To prevent removal of artifacts for long running tasks.
### Does this PR introduce _any_ user-facing change?
Yes. It fixes a bug. When users are running Python UDFs (or Scala UDF) for
more than the specific timeout (30 minutes), and other tasks are submitted by
other sessions - so the cache removal happens by Guava cache, it removes the
artifact directory dedicated for the session.
### How was this patch tested?
Manually tested after taking the logic out. It's difficult to write a test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48665 from HyukjinKwon/SPARK-50118.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/executor/Executor.scala | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 72d97bd78700..c299f38526ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
-import java.util.{Locale, Properties}
+import java.util.{Locale, Properties, Timer, TimerTask}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
@@ -209,9 +209,10 @@ private[spark] class Executor(
// The default isolation group
val defaultSessionState: IsolatedSessionState =
newSessionState(JobArtifactState("default", None))
+ private val cacheExpiryTime = 30 * 60 * 1000
val isolatedSessionCache: Cache[String, IsolatedSessionState] =
CacheBuilder.newBuilder()
.maximumSize(100)
- .expireAfterAccess(30, TimeUnit.MINUTES)
+ .expireAfterAccess(cacheExpiryTime, TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener[String, IsolatedSessionState]() {
override def onRemoval(
notification: RemovalNotification[String, IsolatedSessionState]):
Unit = {
@@ -295,6 +296,8 @@ private[spark] class Executor(
private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false
else true
+ private val timer = new Timer("executor-state-timer", true)
+
// Poller for the memory metrics. Visible for testing.
private[executor] val metricsPoller = new ExecutorMetricsPoller(
env.memoryManager,
@@ -445,6 +448,9 @@ private[spark] class Executor(
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
+ if (timer != null) {
+ timer.cancel()
+ }
ShuffleBlockPusher.stop()
if (threadPool != null) {
threadPool.shutdown()
@@ -559,9 +565,17 @@ private[spark] class Executor(
override def run(): Unit = {
// Classloader isolation
+ var maybeTimerTask: Option[TimerTask] = None
val isolatedSession = taskDescription.artifacts.state match {
case Some(jobArtifactState) =>
- isolatedSessionCache.get(jobArtifactState.uuid, () =>
newSessionState(jobArtifactState))
+ val state = isolatedSessionCache.get(
+ jobArtifactState.uuid, () => newSessionState(jobArtifactState))
+ maybeTimerTask = Some(new TimerTask {
+ // Resets the expire time till the task ends.
+ def run(): Unit =
isolatedSessionCache.getIfPresent(jobArtifactState.uuid)
+ })
+ maybeTimerTask.foreach(timer.schedule(_, cacheExpiryTime / 10,
cacheExpiryTime / 10))
+ state
case _ => defaultSessionState
}
@@ -862,6 +876,7 @@ private[spark] class Executor(
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),
t)
}
} finally {
+ maybeTimerTask.foreach(_.cancel)
cleanMDCForTask(taskName, mdcProperties)
runningTasks.remove(taskId)
if (taskStarted) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]