This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new d79d10291c6 [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by
newSession should not share the channel
d79d10291c6 is described below
commit d79d10291c686377468d7f1bf46f866a243d5551
Author: Rui Wang <[email protected]>
AuthorDate: Fri Mar 10 19:38:19 2023 -0400
[SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should
not share the channel
### What changes were proposed in this pull request?
SparkSession created by newSession should not share the channel. This is
because that a SparkSession might be called `stop` in which the channel it uses
will be shutdown. If the channel is shared, other non-stop SparkSession that is
sharing this channel will get into trouble.
### Why are the changes needed?
This fixes the issue when one SparkSession is stopped to cause other active
SparkSession not working in Spark Connect.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT
Closes #40346 from amaliujia/rw-session-do-not-share-channel.
Authored-by: Rui Wang <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit e5f56e51dcbffb1f79dc00e8493e946ce1209cdc)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/connect/client/SparkConnectClient.scala | 16 +++++++++-------
.../test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +-
.../org/apache/spark/sql/PlanGenerationTestSuite.scala | 2 +-
.../org/apache/spark/sql/SQLImplicitsTestSuite.scala | 2 +-
4 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 348fc94bb89..736a8af8e38 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.connect.client
-import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall,
ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc,
InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status,
TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall,
ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc,
InsecureChannelCredentials, ManagedChannel, ManagedChannelBuilder, Metadata,
MethodDescriptor, Status, TlsChannelCredentials}
import java.net.URI
import java.util.UUID
import java.util.concurrent.Executor
+import scala.language.existentials
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.UserContext
@@ -31,9 +32,11 @@ import
org.apache.spark.sql.connect.common.config.ConnectCommon
*/
private[sql] class SparkConnectClient(
private val userContext: proto.UserContext,
- private val channel: ManagedChannel,
+ private val channelBuilder: ManagedChannelBuilder[_],
private[client] val userAgent: String) {
+ private[this] lazy val channel: ManagedChannel = channelBuilder.build()
+
private[this] val stub =
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
private[client] val artifactManager: ArtifactManager = new
ArtifactManager(userContext, channel)
@@ -164,7 +167,7 @@ private[sql] class SparkConnectClient(
}
def copy(): SparkConnectClient = {
- new SparkConnectClient(userContext, channel, userAgent)
+ new SparkConnectClient(userContext, channelBuilder, userAgent)
}
/**
@@ -208,8 +211,8 @@ private[sql] object SparkConnectClient {
"Either remove 'token' or set 'use_ssl=true'"
// for internal tests
- def apply(userContext: UserContext, channel: ManagedChannel):
SparkConnectClient =
- new SparkConnectClient(userContext, channel, DEFAULT_USER_AGENT)
+ def apply(userContext: UserContext, builder: ManagedChannelBuilder[_]):
SparkConnectClient =
+ new SparkConnectClient(userContext, builder, DEFAULT_USER_AGENT)
def builder(): Builder = new Builder()
@@ -394,10 +397,9 @@ private[sql] object SparkConnectClient {
if (metadata.nonEmpty) {
channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
}
- val channel: ManagedChannel = channelBuilder.build()
new SparkConnectClient(
userContextBuilder.build(),
- channel,
+ channelBuilder,
userAgent.getOrElse(DEFAULT_USER_AGENT))
}
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 43b0cd2674c..42376db880b 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -41,7 +41,7 @@ class DatasetSuite extends ConnectFunSuite with
BeforeAndAfterEach {
private def newSparkSession(): SparkSession = {
val client = new SparkConnectClient(
proto.UserContext.newBuilder().build(),
-
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build(),
+ InProcessChannelBuilder.forName(getClass.getName).directExecutor(),
"test")
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator =
new AtomicLong)
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 0d295d17296..3c7e1fdeee6 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -97,7 +97,7 @@ class PlanGenerationTestSuite
super.beforeAll()
val client = SparkConnectClient(
proto.UserContext.newBuilder().build(),
- InProcessChannelBuilder.forName("/dev/null").build())
+ InProcessChannelBuilder.forName("/dev/null"))
session =
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator
= new AtomicLong)
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
index 3fcc135a22e..f3261ac4850 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
@@ -38,7 +38,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with
BeforeAndAfterAll {
super.beforeAll()
val client = SparkConnectClient(
proto.UserContext.newBuilder().build(),
- InProcessChannelBuilder.forName("/dev/null").build())
+ InProcessChannelBuilder.forName("/dev/null"))
session =
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator
= new AtomicLong)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]