This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8323c0c48de [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED 8323c0c48de is described below commit 8323c0c48de7f498ef2452059f737a167586b98d Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Tue Sep 19 08:31:15 2023 +0900 [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED ### What changes were proposed in this pull request? Add test checking that queries (also special case: local relations) transition to FINISHED state, even if the client does not consume the results. ### Why are the changes needed? Add test for SPARK-45133. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This adds tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42910 from juliuszsompolski/SPARK-45133-followup. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/connect/SparkConnectServerTest.scala | 29 ++++++++++++- .../service/SparkConnectServiceE2ESuite.scala | 48 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala index eddd1c6be72..7b02377f484 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala @@ -16,14 +16,19 @@ */ package org.apache.spark.sql.connect -import java.util.UUID +import java.util.{TimeZone, UUID} +import scala.reflect.runtime.universe.TypeTag + +import org.apache.arrow.memory.RootAllocator import org.scalatest.concurrent.{Eventually, TimeLimits} import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.connect.client.{CloseableIterator, CustomSparkConnectBlockingStub, ExecutePlanResponseReattachableIterator, GrpcRetryHandler, SparkConnectClient, WrappedCloseableIterator} +import org.apache.spark.sql.connect.client.arrow.ArrowSerializer import org.apache.spark.sql.connect.common.config.ConnectCommon import org.apache.spark.sql.connect.config.Connect import org.apache.spark.sql.connect.dsl.MockRemoteSession @@ -43,6 +48,8 @@ trait SparkConnectServerTest extends SharedSparkSession { val eventuallyTimeout = 30.seconds + val allocator = new RootAllocator() + override def beforeAll(): Unit = { super.beforeAll() // Other suites using mocks leave a mess in the global executionManager, @@ -60,6 +67,7 @@ trait SparkConnectServerTest extends SharedSparkSession { override def afterAll(): Unit = { SparkConnectService.stop() + allocator.close() super.afterAll() } @@ -127,6 +135,19 @@ trait SparkConnectServerTest extends SharedSparkSession { proto.Plan.newBuilder().setRoot(dsl.sql(query)).build() } + protected def buildLocalRelation[A <: Product: TypeTag](data: Seq[A]) = { + val encoder = ScalaReflection.encoderFor[A] + val arrowData = + ArrowSerializer.serialize(data.iterator, encoder, allocator, TimeZone.getDefault.getID) + val localRelation = proto.LocalRelation + .newBuilder() + .setData(arrowData) + .setSchema(encoder.schema.json) + .build() + val relation = proto.Relation.newBuilder().setLocalRelation(localRelation).build() + proto.Plan.newBuilder().setRoot(relation).build() + } + protected def getReattachableIterator( stubIterator: CloseableIterator[proto.ExecutePlanResponse]) = { // This depends on the wrapping in CustomSparkConnectBlockingStub.executePlanReattachable: @@ -188,6 +209,12 @@ trait SparkConnectServerTest extends SharedSparkSession { executions.head } + protected def eventuallyGetExecutionHolder: ExecuteHolder = { + Eventually.eventually(timeout(eventuallyTimeout)) { + getExecutionHolder + } + } + protected def withClient(f: SparkConnectClient => Unit): Unit = { val client = SparkConnectClient .builder() diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala new file mode 100644 index 00000000000..14ecc9a2e95 --- /dev/null +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.service + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.connect.SparkConnectServerTest + +class SparkConnectServiceE2ESuite extends SparkConnectServerTest { + + test("SPARK-45133 query should reach FINISHED state when results are not consumed") { + withRawBlockingStub { stub => + val iter = + stub.executePlan(buildExecutePlanRequest(buildPlan("select * from range(1000000)"))) + iter.hasNext + val execution = eventuallyGetExecutionHolder + Eventually.eventually(timeout(30.seconds)) { + execution.eventsManager.status == ExecuteStatus.Finished + } + } + } + + test("SPARK-45133 local relation should reach FINISHED state when results are not consumed") { + withClient { client => + val iter = client.execute(buildLocalRelation((1 to 1000000).map(i => (i, i + 1)))) + iter.hasNext + val execution = eventuallyGetExecutionHolder + Eventually.eventually(timeout(30.seconds)) { + execution.eventsManager.status == ExecuteStatus.Finished + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org