This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f9c8c7a3312 [SPARK-45871][CONNECT] Optimizations collection conversion
related to `.toBuffer` in the `connect` modules
f9c8c7a3312 is described below
commit f9c8c7a3312e533afb95e527ca0c451148bad6a4
Author: yangjie01 <[email protected]>
AuthorDate: Sun Nov 12 13:45:09 2023 +0800
[SPARK-45871][CONNECT] Optimizations collection conversion related to
`.toBuffer` in the `connect` modules
### What changes were proposed in this pull request?
This PR includes the following optimizations related to `.toBuffer` in the
`connect`s module:
1. For the two functions `sql(String, java.util.Map[String, Any]):
DataFrame` and `sql(String, Array[_]): DataFrame` in `SparkSession`, the
approach of using `.find` directly on the `CloseableIterator` to locate the
target and utilizing `.foreach` to consume the remaining elements replaced the
previous method of converting to a collection using `toBuffer.toSeq` and then
searching for the target using `.find`. This approach avoids the need for an
unnecessary collection creation.
2. For function `execute(proto.Relation.Builder => Unit): Unit` in
`SparkSession`, as no elements are returned, `.foreach` is used instead of
`.toBuffer` to avoid an unnecessary collection creation.
3. For function `execute(command: proto.Command): Seq[ExecutePlanResponse]`
in `SparkSession`, in Scala 2.12, `s.c.TraversableOnce#toSeq` returns an
`immutable.Stream`, which is a tail-lazy structure that may not consume all
elements. Therefore, it is necessary to use `s.c.TraversableOnce#toBuffer` for
materialization. However, in Scala 2.13, `s.c.IterableOnceOps#toSeq` constructs
an `immutable.Seq`, which is not a lazy data structure and ensures consumption
of all elements. Therefore [...]
4. The optimizations for the two functions `listAbandonedExecutions:
Seq[ExecuteInfo]` and `listExecuteHolders` in `SparkConnectExecutionManager`
are consistent with item 3 above.
Additionally, to prevent helper function used for testing from creating
copies, a `private[connect]` scope helper function was added to the companion
object of `ExecutePlanResponseReattachableIterator`.
### Why are the changes needed?
Avoid unnecessary collection copies
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GitHub Action
- Added new test cases to demonstrate that both `.toSeq` and `.foreach` can
consume all elements in `ResponseReattachableIterator` in Scala 2.13.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43745 from LuciferYang/SPARK-45871.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 48 ++++++++++++----------
.../connect/client/SparkConnectClientSuite.scala | 48 ++++++++++++++++++++++
.../ExecutePlanResponseReattachableIterator.scala | 10 +++++
.../service/SparkConnectExecutionManager.scala | 6 +--
.../spark/sql/connect/SparkConnectServerTest.scala | 12 +-----
5 files changed, 90 insertions(+), 34 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 34756f9a440..ca692d2d4f8 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -250,15 +250,18 @@ class SparkSession private[sql] (
.setSql(sqlText)
.addAllPosArguments(args.map(lit(_).expr).toImmutableArraySeq.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
- // .toBuffer forces that the iterator is consumed and closed
- val responseSeq = client.execute(plan.build()).toBuffer.toSeq
+ val responseIter = client.execute(plan.build())
- val response = responseSeq
- .find(_.hasSqlCommandResult)
- .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
-
- // Update the builder with the values from the result.
- builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ try {
+ val response = responseIter
+ .find(_.hasSqlCommandResult)
+ .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
+ // Update the builder with the values from the result.
+ builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ } finally {
+ // consume the rest of the iterator
+ responseIter.foreach(_ => ())
+ }
}
/**
@@ -309,15 +312,18 @@ class SparkSession private[sql] (
.setSql(sqlText)
.putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
- // .toBuffer forces that the iterator is consumed and closed
- val responseSeq = client.execute(plan.build()).toBuffer.toSeq
-
- val response = responseSeq
- .find(_.hasSqlCommandResult)
- .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
-
- // Update the builder with the values from the result.
- builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ val responseIter = client.execute(plan.build())
+
+ try {
+ val response = responseIter
+ .find(_.hasSqlCommandResult)
+ .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
+ // Update the builder with the values from the result.
+ builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ } finally {
+ // consume the rest of the iterator
+ responseIter.foreach(_ => ())
+ }
}
/**
@@ -543,14 +549,14 @@ class SparkSession private[sql] (
f(builder)
builder.getCommonBuilder.setPlanId(planIdGenerator.getAndIncrement())
val plan = proto.Plan.newBuilder().setRoot(builder).build()
- // .toBuffer forces that the iterator is consumed and closed
- client.execute(plan).toBuffer
+ // .foreach forces that the iterator is consumed and closed
+ client.execute(plan).foreach(_ => ())
}
private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] =
{
val plan = proto.Plan.newBuilder().setCommand(command).build()
- // .toBuffer forces that the iterator is consumed and closed
- client.execute(plan).toBuffer.toSeq
+ // .toSeq forces that the iterator is consumed and closed
+ client.execute(plan).toSeq
}
private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction):
Unit = {
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index d0c85da5f21..b93713383b2 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -387,6 +387,54 @@ class SparkConnectClientSuite extends ConnectFunSuite with
BeforeAndAfterEach {
}
assert(dummyFn.counter == 2)
}
+
+ test("SPARK-45871: Client execute iterator.toSeq consumes the reattachable
iterator") {
+ startDummyServer(0)
+ client = SparkConnectClient
+ .builder()
+ .connectionString(s"sc://localhost:${server.getPort}")
+ .enableReattachableExecute()
+ .build()
+ val session = SparkSession.builder().client(client).create()
+ val cmd = session.newCommand(b =>
+ b.setSqlCommand(
+ proto.SqlCommand
+ .newBuilder()
+ .setSql("select * from range(10000000)")))
+ val plan = proto.Plan.newBuilder().setCommand(cmd)
+ val iter = client.execute(plan.build())
+ val reattachableIter =
+ ExecutePlanResponseReattachableIterator.fromIterator(iter)
+ iter.toSeq
+ // In several places in SparkSession, we depend on `.toSeq` to consume and
close the iterator.
+ // If this assertion fails, we need to double check the correctness of
that.
+ // In scala 2.12 `s.c.TraversableOnce#toSeq` builds an `immutable.Stream`,
+ // which is a tail lazy structure and this would fail.
+ // In scala 2.13 `s.c.IterableOnceOps#toSeq` builds an `immutable.Seq`
which is not
+ // lazy and will consume and close the iterator.
+ assert(reattachableIter.resultComplete)
+ }
+
+ test("SPARK-45871: Client execute iterator.foreach consumes the reattachable
iterator") {
+ startDummyServer(0)
+ client = SparkConnectClient
+ .builder()
+ .connectionString(s"sc://localhost:${server.getPort}")
+ .enableReattachableExecute()
+ .build()
+ val session = SparkSession.builder().client(client).create()
+ val cmd = session.newCommand(b =>
+ b.setSqlCommand(
+ proto.SqlCommand
+ .newBuilder()
+ .setSql("select * from range(10000000)")))
+ val plan = proto.Plan.newBuilder().setCommand(cmd)
+ val iter = client.execute(plan.build())
+ val reattachableIter =
+ ExecutePlanResponseReattachableIterator.fromIterator(iter)
+ iter.foreach(_ => ())
+ assert(reattachableIter.resultComplete)
+ }
}
class DummySparkConnectService() extends
SparkConnectServiceGrpc.SparkConnectServiceImplBase {
diff --git
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 2b61463c343..cfa492ef063 100644
---
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -326,3 +326,13 @@ class ExecutePlanResponseReattachableIterator(
private def retry[T](fn: => T): T =
GrpcRetryHandler.retry(retryPolicy)(fn)
}
+
+private[connect] object ExecutePlanResponseReattachableIterator {
+ @scala.annotation.tailrec
+ private[connect] def fromIterator(
+ iter: Iterator[proto.ExecutePlanResponse]):
ExecutePlanResponseReattachableIterator =
+ iter match {
+ case e: ExecutePlanResponseReattachableIterator => e
+ case w: WrappedCloseableIterator[proto.ExecutePlanResponse] =>
fromIterator(w.innerIterator)
+ }
+}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index c004358e1cf..36c6f73329b 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -153,7 +153,7 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
* cache, and the tombstones will be eventually removed.
*/
def listAbandonedExecutions: Seq[ExecuteInfo] = {
- abandonedTombstones.asMap.asScala.values.toBuffer.toSeq
+ abandonedTombstones.asMap.asScala.values.toSeq
}
private[connect] def shutdown(): Unit = executionsLock.synchronized {
@@ -236,7 +236,7 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
executions.values.foreach(_.interruptGrpcResponseSenders())
}
- private[connect] def listExecuteHolders = executionsLock.synchronized {
- executions.values.toBuffer.toSeq
+ private[connect] def listExecuteHolders: Seq[ExecuteHolder] =
executionsLock.synchronized {
+ executions.values.toSeq
}
}
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index c4a5539ce0b..1c0d9a68ab6 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -27,7 +27,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.connect.client.{CloseableIterator,
CustomSparkConnectBlockingStub, ExecutePlanResponseReattachableIterator,
GrpcRetryHandler, SparkConnectClient, SparkConnectStubState,
WrappedCloseableIterator}
+import org.apache.spark.sql.connect.client.{CloseableIterator,
CustomSparkConnectBlockingStub, ExecutePlanResponseReattachableIterator,
GrpcRetryHandler, SparkConnectClient, SparkConnectStubState}
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.connect.config.Connect
@@ -147,15 +147,7 @@ trait SparkConnectServerTest extends SharedSparkSession {
protected def getReattachableIterator(
stubIterator: CloseableIterator[proto.ExecutePlanResponse]) = {
- // This depends on the wrapping in
CustomSparkConnectBlockingStub.executePlanReattachable:
- // GrpcExceptionConverter.convertIterator
- stubIterator
- .asInstanceOf[WrappedCloseableIterator[proto.ExecutePlanResponse]]
- .innerIterator
- .asInstanceOf[WrappedCloseableIterator[proto.ExecutePlanResponse]]
- // ExecutePlanResponseReattachableIterator
- .innerIterator
- .asInstanceOf[ExecutePlanResponseReattachableIterator]
+ ExecutePlanResponseReattachableIterator.fromIterator(stubIterator)
}
protected def assertNoActiveRpcs(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]