This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba279a3ff76 [SPARK-50118][CONNET] Reset isolated state cache when 
tasks are running
7ba279a3ff76 is described below

commit 7ba279a3ff76bd17f25035ced90ea882812dfac8
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Nov 12 08:32:59 2024 +0900

    [SPARK-50118][CONNET] Reset isolated state cache when tasks are running
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to reset the expire timeout of the isolated session. 
during the tasks running.
    
    ### Why are the changes needed?
    
    To prevent removal of artifacts for long running tasks.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. It fixes a bug. When users are running Python UDFs (or Scala UDF) for 
more than the specific timeout (30 minutes), and other tasks are submitted by 
other sessions - so the cache removal happens by Guava cache, it removes the 
artifact directory dedicated for the session.
    
    ### How was this patch tested?
    
    Manually tested after taking the logic out. It's difficult to write a test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48665 from HyukjinKwon/SPARK-50118.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala  | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 72d97bd78700..c299f38526ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
 import java.lang.management.ManagementFactory
 import java.net.{URI, URL}
 import java.nio.ByteBuffer
-import java.util.{Locale, Properties}
+import java.util.{Locale, Properties, Timer, TimerTask}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
@@ -209,9 +209,10 @@ private[spark] class Executor(
   // The default isolation group
   val defaultSessionState: IsolatedSessionState = 
newSessionState(JobArtifactState("default", None))
 
+  private val cacheExpiryTime = 30 * 60 * 1000
   val isolatedSessionCache: Cache[String, IsolatedSessionState] = 
CacheBuilder.newBuilder()
     .maximumSize(100)
-    .expireAfterAccess(30, TimeUnit.MINUTES)
+    .expireAfterAccess(cacheExpiryTime, TimeUnit.MILLISECONDS)
     .removalListener(new RemovalListener[String, IsolatedSessionState]() {
       override def onRemoval(
           notification: RemovalNotification[String, IsolatedSessionState]): 
Unit = {
@@ -295,6 +296,8 @@ private[spark] class Executor(
 
   private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false 
else true
 
+  private val timer = new Timer("executor-state-timer", true)
+
   // Poller for the memory metrics. Visible for testing.
   private[executor] val metricsPoller = new ExecutorMetricsPoller(
     env.memoryManager,
@@ -445,6 +448,9 @@ private[spark] class Executor(
         case NonFatal(e) =>
           logWarning("Unable to stop heartbeater", e)
       }
+      if (timer != null) {
+        timer.cancel()
+      }
       ShuffleBlockPusher.stop()
       if (threadPool != null) {
         threadPool.shutdown()
@@ -559,9 +565,17 @@ private[spark] class Executor(
     override def run(): Unit = {
 
       // Classloader isolation
+      var maybeTimerTask: Option[TimerTask] = None
       val isolatedSession = taskDescription.artifacts.state match {
         case Some(jobArtifactState) =>
-          isolatedSessionCache.get(jobArtifactState.uuid, () => 
newSessionState(jobArtifactState))
+          val state = isolatedSessionCache.get(
+            jobArtifactState.uuid, () => newSessionState(jobArtifactState))
+          maybeTimerTask = Some(new TimerTask {
+            // Resets the expire time till the task ends.
+            def run(): Unit = 
isolatedSessionCache.getIfPresent(jobArtifactState.uuid)
+          })
+          maybeTimerTask.foreach(timer.schedule(_, cacheExpiryTime / 10, 
cacheExpiryTime / 10))
+          state
         case _ => defaultSessionState
       }
 
@@ -862,6 +876,7 @@ private[spark] class Executor(
             uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), 
t)
           }
       } finally {
+        maybeTimerTask.foreach(_.cancel)
         cleanMDCForTask(taskName, mdcProperties)
         runningTasks.remove(taskId)
         if (taskStarted) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to