This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d12646ee224 [SPARK-45596][CONNECT] Use java.lang.ref.Cleaner instead
of org.apache.spark.sql.connect.client.util.Cleaner
d12646ee224 is described below
commit d12646ee224f66dfc9bc7be9cbe2a70b85d7439a
Author: zhaomin <[email protected]>
AuthorDate: Thu Oct 26 22:08:02 2023 +0800
[SPARK-45596][CONNECT] Use java.lang.ref.Cleaner instead of
org.apache.spark.sql.connect.client.util.Cleaner
### What changes were proposed in this pull request?
Use java.lang.ref.Cleaner instead of
org.apache.spark.sql.connect.client.util.Cleaner
### Why are the changes needed?
org.apache.spark.sql.connect.client.util.Cleaner is a temporary class,
waiting for the upgrade of jdk to 9 or above.
https://issues.apache.org/jira/browse/SPARK-45596
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
pass ci
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43439 from zhaomin1423/connect_cleaner.
Authored-by: zhaomin <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 13 +--
.../org/apache/spark/sql/ClientDatasetSuite.scala | 2 +-
.../apache/spark/sql/PlanGenerationTestSuite.scala | 3 +-
.../apache/spark/sql/SQLImplicitsTestSuite.scala | 3 +-
.../spark/sql/connect/client/SparkResult.scala | 22 ++--
.../spark/sql/connect/client/util/Cleaner.scala | 113 ---------------------
6 files changed, 20 insertions(+), 136 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e60bda0a838..5e640bea570 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -39,7 +39,6 @@ import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BoxedLongEncoder
import org.apache.spark.sql.connect.client.{ClassFinder, SparkConnectClient,
SparkResult}
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
-import org.apache.spark.sql.connect.client.util.Cleaner
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.{CatalogImpl, SqlApiConf}
import org.apache.spark.sql.streaming.DataStreamReader
@@ -66,7 +65,6 @@ import org.apache.spark.sql.types.StructType
*/
class SparkSession private[sql] (
private[sql] val client: SparkConnectClient,
- private val cleaner: Cleaner,
private val planIdGenerator: AtomicLong)
extends Serializable
with Closeable
@@ -536,7 +534,6 @@ class SparkSession private[sql] (
private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]):
SparkResult[T] = {
val value = client.execute(plan)
val result = new SparkResult(value, allocator, encoder, timeZoneId)
- cleaner.register(result)
result
}
@@ -774,7 +771,7 @@ object SparkSession extends Logging {
* Create a new [[SparkSession]] based on the connect client
[[Configuration]].
*/
private[sql] def create(configuration: Configuration): SparkSession = {
- new SparkSession(configuration.toSparkConnectClient, cleaner,
planIdGenerator)
+ new SparkSession(configuration.toSparkConnectClient, planIdGenerator)
}
/**
@@ -795,12 +792,6 @@ object SparkSession extends Logging {
*/
def builder(): Builder = new Builder()
- private[sql] lazy val cleaner = {
- val cleaner = new Cleaner
- cleaner.start()
- cleaner
- }
-
class Builder() extends Logging {
// Initialize the connection string of the Spark Connect client builder
from SPARK_REMOTE
// by default, if it exists. The connection string can be overridden using
@@ -911,7 +902,7 @@ object SparkSession extends Logging {
private def tryCreateSessionFromClient(): Option[SparkSession] = {
if (client != null) {
- Option(new SparkSession(client, cleaner, planIdGenerator))
+ Option(new SparkSession(client, planIdGenerator))
} else {
None
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
index aab31d97e8c..041b0928365 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala
@@ -43,7 +43,7 @@ class ClientDatasetSuite extends ConnectFunSuite with
BeforeAndAfterEach {
private def newSparkSession(): SparkSession = {
val client = SparkConnectClient(
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build())
- new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator =
new AtomicLong)
+ new SparkSession(client, planIdGenerator = new AtomicLong)
}
private def startDummyServer(): Unit = {
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index e4c5b851b13..cf287088b59 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -112,8 +112,7 @@ class PlanGenerationTestSuite
override protected def beforeAll(): Unit = {
super.beforeAll()
val client =
SparkConnectClient(InProcessChannelBuilder.forName("/dev/null").build())
- session =
- new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator
= new AtomicLong)
+ session = new SparkSession(client, planIdGenerator = new AtomicLong)
}
override protected def beforeEach(): Unit = {
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
index 53743feb03b..b2c13850a13 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
@@ -39,8 +39,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with
BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
super.beforeAll()
val client =
SparkConnectClient(InProcessChannelBuilder.forName("/dev/null").build())
- session =
- new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator
= new AtomicLong)
+ session = new SparkSession(client, planIdGenerator = new AtomicLong)
}
test("column resolution") {
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
index d4b40624be7..7a7c6a2d6c9 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.connect.client
+import java.lang.ref.Cleaner
import java.util.Objects
import scala.collection.mutable
@@ -28,7 +29,6 @@ import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ProductEncoder,
UnboundRowEncoder}
import org.apache.spark.sql.connect.client.arrow.{AbstractMessageIterator,
ArrowDeserializingIterator, ConcatenatingArrowStreamReader, MessageIterator}
-import org.apache.spark.sql.connect.client.util.Cleanable
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.ArrowUtils
@@ -38,8 +38,7 @@ private[sql] class SparkResult[T](
allocator: BufferAllocator,
encoder: AgnosticEncoder[T],
timeZoneId: String)
- extends AutoCloseable
- with Cleanable { self =>
+ extends AutoCloseable { self =>
private[this] var opId: String = _
private[this] var numRecords: Int = 0
@@ -47,6 +46,8 @@ private[sql] class SparkResult[T](
private[this] var arrowSchema: pojo.Schema = _
private[this] var nextResultIndex: Int = 0
private val resultMap = mutable.Map.empty[Int, (Long, Seq[ArrowMessage])]
+ private val cleanable =
+ SparkResult.cleaner.register(this, new SparkResultCloseable(resultMap,
responses))
/**
* Update RowEncoder and recursively update the fields of the ProductEncoder
if found.
@@ -257,9 +258,7 @@ private[sql] class SparkResult[T](
/**
* Close this result, freeing any underlying resources.
*/
- override def close(): Unit = cleaner.close()
-
- override val cleaner: AutoCloseable = new SparkResultCloseable(resultMap,
responses)
+ override def close(): Unit = cleanable.clean()
private class ResultMessageIterator(destructive: Boolean) extends
AbstractMessageIterator {
private[this] var totalBytesRead = 0L
@@ -309,12 +308,21 @@ private[sql] class SparkResult[T](
}
}
+private object SparkResult {
+ private val cleaner: Cleaner = Cleaner.create()
+}
+
private[client] class SparkResultCloseable(
resultMap: mutable.Map[Int, (Long, Seq[ArrowMessage])],
responses: CloseableIterator[proto.ExecutePlanResponse])
- extends AutoCloseable {
+ extends AutoCloseable
+ with Runnable {
override def close(): Unit = {
resultMap.values.foreach(_._2.foreach(_.close()))
responses.close()
}
+
+ override def run(): Unit = {
+ close()
+ }
}
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
deleted file mode 100644
index 4eecc881356..00000000000
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.connect.client.util
-
-import java.lang.ref.{ReferenceQueue, WeakReference}
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
-
-import scala.collection.mutable
-import scala.util.control.NonFatal
-
-/**
- * Helper class for cleaning up an object's resources after the object itself
has been garbage
- * collected.
- *
- * When we move to Java 9+ we should replace this class by
[[java.lang.ref.Cleaner]].
- */
-private[sql] class Cleaner {
- class Ref(pin: AnyRef, val resource: AutoCloseable)
- extends WeakReference[AnyRef](pin, referenceQueue)
- with AutoCloseable {
- override def close(): Unit = resource.close()
- }
-
- def register(pin: Cleanable): Unit = {
- register(pin, pin.cleaner)
- }
-
- /**
- * Register an objects' resources for clean-up. Note that it is absolutely
pivotal that resource
- * itself does not contain any reference to the object, if it does the
object will never be
- * garbage collected and the clean-up will never be performed.
- *
- * @param pin
- * who's resources need to be cleaned up after GC.
- * @param resource
- * to clean-up.
- */
- def register(pin: AnyRef, resource: AutoCloseable): Unit = {
- referenceBuffer.add(new Ref(pin, resource))
- }
-
- @volatile private var stopped = false
- private val referenceBuffer = Collections.newSetFromMap[Ref](new
ConcurrentHashMap)
- private val referenceQueue = new ReferenceQueue[AnyRef]
-
- private val cleanerThread = {
- val thread = new Thread(() => cleanUp())
- thread.setName("cleaner")
- thread.setDaemon(true)
- thread
- }
-
- def start(): Unit = {
- require(!stopped)
- cleanerThread.start()
- }
-
- def stop(): Unit = {
- stopped = true
- cleanerThread.interrupt()
- }
-
- private def cleanUp(): Unit = {
- while (!stopped) {
- try {
- val ref = referenceQueue.remove().asInstanceOf[Ref]
- referenceBuffer.remove(ref)
- ref.close()
- } catch {
- case NonFatal(e) =>
- // Perhaps log this?
- e.printStackTrace()
- }
- }
- }
-}
-
-trait Cleanable {
- def cleaner: AutoCloseable
-}
-
-object AutoCloseables {
- def apply(resources: Seq[AutoCloseable]): AutoCloseable = { () =>
- val throwables = mutable.Buffer.empty[Throwable]
- resources.foreach { resource =>
- try {
- resource.close()
- } catch {
- case NonFatal(e) => throwables += e
- }
- }
- if (throwables.nonEmpty) {
- val t = throwables.head
- throwables.tail.foreach(t.addSuppressed)
- throw t
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]