This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new de8d96892f92 [SPARK-48370][CONNECT][FOLLOW-UP] Use JDK's Cleaner
instead
de8d96892f92 is described below
commit de8d96892f9212a1bd7cd1b4dfad172d0cb8cd35
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue May 28 09:33:20 2024 +0900
[SPARK-48370][CONNECT][FOLLOW-UP] Use JDK's Cleaner instead
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/46683 that
replaces our custom cleaner to JDK's cleaner.
### Why are the changes needed?
Reuse the standard builtin library.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I manually tested via reenabling `CheckpointSuite.checkpoint gc derived
DataFrame`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46726 from HyukjinKwon/SPARK-48370-followup.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 9 +-
.../apache/spark/sql/internal/SessionCleaner.scala | 125 +++------------------
3 files changed, 16 insertions(+), 120 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5ac07270b22b..204d3985cf4b 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3492,7 +3492,7 @@ class Dataset[T] private[sql] (
.getOrElse(throw new RuntimeException("CheckpointCommandResult must
be present"))
val cachedRemoteRelation =
response.getCheckpointCommandResult.getRelation
-
sparkSession.cleaner.registerCachedRemoteRelationForCleanup(cachedRemoteRelation)
+ sparkSession.cleaner.register(cachedRemoteRelation)
// Update the builder with the values from the result.
builder.setCachedRemoteRelation(cachedRemoteRelation)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 91ee0f52e8bd..19c5a3f14c64 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -73,11 +73,7 @@ class SparkSession private[sql] (
with Logging {
private[this] val allocator = new RootAllocator()
- private var shouldStopCleaner = false
- private[sql] lazy val cleaner = {
- shouldStopCleaner = true
- new SessionCleaner(this)
- }
+ private[sql] lazy val cleaner = new SessionCleaner(this)
// a unique session ID for this session from client.
private[sql] def sessionId: String = client.sessionId
@@ -719,9 +715,6 @@ class SparkSession private[sql] (
if (releaseSessionOnClose) {
client.releaseSession()
}
- if (shouldStopCleaner) {
- cleaner.stop()
- }
client.shutdown()
allocator.close()
SparkSession.onSessionClose(this)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala
index 036ea4a84fa9..21e4f4d141a8 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/SessionCleaner.scala
@@ -17,130 +17,33 @@
package org.apache.spark.sql.internal
-import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
+import java.lang.ref.Cleaner
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-/**
- * Classes that represent cleaning tasks.
- */
-private sealed trait CleanupTask
-private case class CleanupCachedRemoteRelation(dfID: String) extends
CleanupTask
-
-/**
- * A WeakReference associated with a CleanupTask.
- *
- * When the referent object becomes only weakly reachable, the corresponding
- * CleanupTaskWeakReference is automatically added to the given reference
queue.
- */
-private class CleanupTaskWeakReference(
- val task: CleanupTask,
- referent: AnyRef,
- referenceQueue: ReferenceQueue[AnyRef])
- extends WeakReference(referent, referenceQueue)
-
-/**
- * An asynchronous cleaner for objects.
- *
- * This maintains a weak reference for each CashRemoteRelation, etc. of
interest, to be processed
- * when the associated object goes out of scope of the application. Actual
cleanup is performed in
- * a separate daemon thread.
- */
private[sql] class SessionCleaner(session: SparkSession) extends Logging {
-
- /**
- * How often (seconds) to trigger a garbage collection in this JVM. This
context cleaner
- * triggers cleanups only when weak references are garbage collected. In
long-running
- * applications with large driver JVMs, where there is little memory
pressure on the driver,
- * this may happen very occasionally or not at all. Not cleaning at all may
lead to executors
- * running out of disk space after a while.
- */
- private val refQueuePollTimeout: Long = 100
-
- /**
- * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage
collected as long as they
- * have not been handled by the reference queue.
- */
- private val referenceBuffer =
- Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
-
- private val referenceQueue = new ReferenceQueue[AnyRef]
-
- private val cleaningThread = new Thread() { override def run(): Unit =
keepCleaning() }
-
- @volatile private var started = false
- @volatile private var stopped = false
-
- /** Start the cleaner. */
- def start(): Unit = {
- cleaningThread.setDaemon(true)
- cleaningThread.setName("Spark Connect Context Cleaner")
- cleaningThread.start()
- }
-
- /**
- * Stop the cleaning thread and wait until the thread has finished running
its current task.
- */
- def stop(): Unit = {
- stopped = true
- // Interrupt the cleaning thread, but wait until the current task has
finished before
- // doing so. This guards against the race condition where a cleaning
thread may
- // potentially clean similarly named variables created by a different
SparkSession.
- synchronized {
- cleaningThread.interrupt()
- }
- cleaningThread.join()
- }
+ private val cleaner = Cleaner.create()
/** Register a CachedRemoteRelation for cleanup when it is garbage
collected. */
- def registerCachedRemoteRelationForCleanup(relation:
proto.CachedRemoteRelation): Unit = {
- registerForCleanup(relation,
CleanupCachedRemoteRelation(relation.getRelationId))
- }
-
- /** Register an object for cleanup. */
- private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask):
Unit = {
- if (!started) {
- // Lazily starts when the first cleanup is registered.
- start()
- started = true
- }
- referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup,
referenceQueue))
+ def register(relation: proto.CachedRemoteRelation): Unit = {
+ val dfID = relation.getRelationId
+ cleaner.register(relation, () => doCleanupCachedRemoteRelation(dfID))
}
- /** Keep cleaning objects. */
- private def keepCleaning(): Unit = {
- while (!stopped && !session.client.channel.isShutdown) {
- try {
- val reference = Option(referenceQueue.remove(refQueuePollTimeout))
- .map(_.asInstanceOf[CleanupTaskWeakReference])
- // Synchronize here to avoid being interrupted on stop()
- synchronized {
- reference.foreach { ref =>
- logDebug("Got cleaning task " + ref.task)
- referenceBuffer.remove(ref)
- ref.task match {
- case CleanupCachedRemoteRelation(dfID) =>
- doCleanupCachedRemoteRelation(dfID)
- }
+ private[sql] def doCleanupCachedRemoteRelation(dfID: String): Unit = {
+ try {
+ if (!session.client.channel.isShutdown) {
+ session.execute {
+ session.newCommand { builder =>
+ builder.getRemoveCachedRemoteRelationCommandBuilder
+
.setRelation(proto.CachedRemoteRelation.newBuilder().setRelationId(dfID).build())
}
}
- } catch {
- case e: Throwable => logError("Error in cleaning thread", e)
- }
- }
- }
-
- /** Perform CleanupCachedRemoteRelation cleanup. */
- private[spark] def doCleanupCachedRemoteRelation(dfID: String): Unit = {
- session.execute {
- session.newCommand { builder =>
- builder.getRemoveCachedRemoteRelationCommandBuilder
-
.setRelation(proto.CachedRemoteRelation.newBuilder().setRelationId(dfID).build())
}
+ } catch {
+ case e: Throwable => logError("Error in cleaning thread", e)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]