This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d12646ee224 [SPARK-45596][CONNECT] Use java.lang.ref.Cleaner instead 
of org.apache.spark.sql.connect.client.util.Cleaner
d12646ee224 is described below

commit d12646ee224f66dfc9bc7be9cbe2a70b85d7439a
Author: zhaomin <[email protected]>
AuthorDate: Thu Oct 26 22:08:02 2023 +0800

    [SPARK-45596][CONNECT] Use java.lang.ref.Cleaner instead of 
org.apache.spark.sql.connect.client.util.Cleaner
    
    ### What changes were proposed in this pull request?
    
    Use java.lang.ref.Cleaner instead of 
org.apache.spark.sql.connect.client.util.Cleaner
    
    ### Why are the changes needed?
    
    org.apache.spark.sql.connect.client.util.Cleaner is a temporary class, 
waiting for the upgrade of jdk to 9 or above.
    https://issues.apache.org/jira/browse/SPARK-45596
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    pass ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43439 from zhaomin1423/connect_cleaner.
    
    Authored-by: zhaomin <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  13 +--
 .../org/apache/spark/sql/ClientDatasetSuite.scala  |   2 +-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   3 +-
 .../apache/spark/sql/SQLImplicitsTestSuite.scala   |   3 +-
 .../spark/sql/connect/client/SparkResult.scala     |  22 ++--
 .../spark/sql/connect/client/util/Cleaner.scala    | 113 ---------------------
 6 files changed, 20 insertions(+), 136 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e60bda0a838..5e640bea570 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -39,7 +39,6 @@ import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedLongEncoder
 import org.apache.spark.sql.connect.client.{ClassFinder, SparkConnectClient, 
SparkResult}
 import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
 import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
-import org.apache.spark.sql.connect.client.util.Cleaner
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.internal.{CatalogImpl, SqlApiConf}
 import org.apache.spark.sql.streaming.DataStreamReader
@@ -66,7 +65,6 @@ import org.apache.spark.sql.types.StructType
  */
 class SparkSession private[sql] (
     private[sql] val client: SparkConnectClient,
-    private val cleaner: Cleaner,
     private val planIdGenerator: AtomicLong)
     extends Serializable
     with Closeable
@@ -536,7 +534,6 @@ class SparkSession private[sql] (
   private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): 
SparkResult[T] = {
     val value = client.execute(plan)
     val result = new SparkResult(value, allocator, encoder, timeZoneId)
-    cleaner.register(result)
     result
   }
 
@@ -774,7 +771,7 @@ object SparkSession extends Logging {
    * Create a new [[SparkSession]] based on the connect client 
[[Configuration]].
    */
   private[sql] def create(configuration: Configuration): SparkSession = {
-    new SparkSession(configuration.toSparkConnectClient, cleaner, 
planIdGenerator)
+    new SparkSession(configuration.toSparkConnectClient, planIdGenerator)
   }
 
   /**
@@ -795,12 +792,6 @@ object SparkSession extends Logging {
    */
   def builder(): Builder = new Builder()
 
-  private[sql] lazy val cleaner = {
-    val cleaner = new Cleaner
-    cleaner.start()
-    cleaner
-  }
-
   class Builder() extends Logging {
     // Initialize the connection string of the Spark Connect client builder 
from SPARK_REMOTE
     // by default, if it exists. The connection string can be overridden using
@@ -911,7 +902,7 @@ object SparkSession extends Logging {
 
     private def tryCreateSessionFromClient(): Option[SparkSession] = {
       if (client != null) {
-        Option(new SparkSession(client, cleaner, planIdGenerator))
+        Option(new SparkSession(client, planIdGenerator))
       } else {
         None
       }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
index aab31d97e8c..041b0928365 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
@@ -43,7 +43,7 @@ class ClientDatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
   private def newSparkSession(): SparkSession = {
     val client = SparkConnectClient(
       
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build())
-    new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = 
new AtomicLong)
+    new SparkSession(client, planIdGenerator = new AtomicLong)
   }
 
   private def startDummyServer(): Unit = {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index e4c5b851b13..cf287088b59 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -112,8 +112,7 @@ class PlanGenerationTestSuite
   override protected def beforeAll(): Unit = {
     super.beforeAll()
     val client = 
SparkConnectClient(InProcessChannelBuilder.forName("/dev/null").build())
-    session =
-      new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator 
= new AtomicLong)
+    session = new SparkSession(client, planIdGenerator = new AtomicLong)
   }
 
   override protected def beforeEach(): Unit = {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
index 53743feb03b..b2c13850a13 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
@@ -39,8 +39,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with 
BeforeAndAfterAll {
   override protected def beforeAll(): Unit = {
     super.beforeAll()
     val client = 
SparkConnectClient(InProcessChannelBuilder.forName("/dev/null").build())
-    session =
-      new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator 
= new AtomicLong)
+    session = new SparkSession(client, planIdGenerator = new AtomicLong)
   }
 
   test("column resolution") {
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
index d4b40624be7..7a7c6a2d6c9 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.connect.client
 
+import java.lang.ref.Cleaner
 import java.util.Objects
 
 import scala.collection.mutable
@@ -28,7 +29,6 @@ import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder, 
UnboundRowEncoder}
 import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator, 
ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator}
-import org.apache.spark.sql.connect.client.util.Cleanable
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.ArrowUtils
@@ -38,8 +38,7 @@ private[sql] class SparkResult[T](
     allocator: BufferAllocator,
     encoder: AgnosticEncoder[T],
     timeZoneId: String)
-    extends AutoCloseable
-    with Cleanable { self =>
+    extends AutoCloseable { self =>
 
   private[this] var opId: String = _
   private[this] var numRecords: Int = 0
@@ -47,6 +46,8 @@ private[sql] class SparkResult[T](
   private[this] var arrowSchema: pojo.Schema = _
   private[this] var nextResultIndex: Int = 0
   private val resultMap = mutable.Map.empty[Int, (Long, Seq[ArrowMessage])]
+  private val cleanable =
+    SparkResult.cleaner.register(this, new SparkResultCloseable(resultMap, 
responses))
 
   /**
    * Update RowEncoder and recursively update the fields of the ProductEncoder 
if found.
@@ -257,9 +258,7 @@ private[sql] class SparkResult[T](
   /**
    * Close this result, freeing any underlying resources.
    */
-  override def close(): Unit = cleaner.close()
-
-  override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap, 
responses)
+  override def close(): Unit = cleanable.clean()
 
   private class ResultMessageIterator(destructive: Boolean) extends 
AbstractMessageIterator {
     private[this] var totalBytesRead = 0L
@@ -309,12 +308,21 @@ private[sql] class SparkResult[T](
   }
 }
 
+private object SparkResult {
+  private val cleaner: Cleaner = Cleaner.create()
+}
+
 private[client] class SparkResultCloseable(
     resultMap: mutable.Map[Int, (Long, Seq[ArrowMessage])],
     responses: CloseableIterator[proto.ExecutePlanResponse])
-    extends AutoCloseable {
+    extends AutoCloseable
+    with Runnable {
   override def close(): Unit = {
     resultMap.values.foreach(_._2.foreach(_.close()))
     responses.close()
   }
+
+  override def run(): Unit = {
+    close()
+  }
 }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
deleted file mode 100644
index 4eecc881356..00000000000
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connect.client.util
-
-import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-/**
- * Helper class for cleaning up an object's resources after the object itself 
has been garbage
- * collected.
- *
- * When we move to Java 9+ we should replace this class by 
[[java.lang.ref.Cleaner]].
- */
-private[sql] class Cleaner {
-  class Ref(pin: AnyRef, val resource: AutoCloseable)
-      extends WeakReference[AnyRef](pin, referenceQueue)
-      with AutoCloseable {
-    override def close(): Unit = resource.close()
-  }
-
-  def register(pin: Cleanable): Unit = {
-    register(pin, pin.cleaner)
-  }
-
-  /**
-   * Register an objects' resources for clean-up. Note that it is absolutely 
pivotal that resource
-   * itself does not contain any reference to the object, if it does the 
object will never be
-   * garbage collected and the clean-up will never be performed.
-   *
-   * @param pin
-   *   who's resources need to be cleaned up after GC.
-   * @param resource
-   *   to clean-up.
-   */
-  def register(pin: AnyRef, resource: AutoCloseable): Unit = {
-    referenceBuffer.add(new Ref(pin, resource))
-  }
-
-  @volatile private var stopped = false
-  private val referenceBuffer = Collections.newSetFromMap[Ref](new 
ConcurrentHashMap)
-  private val referenceQueue = new ReferenceQueue[AnyRef]
-
-  private val cleanerThread = {
-    val thread = new Thread(() => cleanUp())
-    thread.setName("cleaner")
-    thread.setDaemon(true)
-    thread
-  }
-
-  def start(): Unit = {
-    require(!stopped)
-    cleanerThread.start()
-  }
-
-  def stop(): Unit = {
-    stopped = true
-    cleanerThread.interrupt()
-  }
-
-  private def cleanUp(): Unit = {
-    while (!stopped) {
-      try {
-        val ref = referenceQueue.remove().asInstanceOf[Ref]
-        referenceBuffer.remove(ref)
-        ref.close()
-      } catch {
-        case NonFatal(e) =>
-          // Perhaps log this?
-          e.printStackTrace()
-      }
-    }
-  }
-}
-
-trait Cleanable {
-  def cleaner: AutoCloseable
-}
-
-object AutoCloseables {
-  def apply(resources: Seq[AutoCloseable]): AutoCloseable = { () =>
-    val throwables = mutable.Buffer.empty[Throwable]
-    resources.foreach { resource =>
-      try {
-        resource.close()
-      } catch {
-        case NonFatal(e) => throwables += e
-      }
-    }
-    if (throwables.nonEmpty) {
-      val t = throwables.head
-      throwables.tail.foreach(t.addSuppressed)
-      throw t
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to