This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a1e27a38868b [SPARK-48341][CONNECT] Allow plugins to use QueryTest in their tests a1e27a38868b is described below commit a1e27a38868b80bef93b7d7bb43e6e15b79b14aa Author: Tom van Bussel <tom.vanbus...@databricks.com> AuthorDate: Wed May 22 09:09:22 2024 +0900 [SPARK-48341][CONNECT] Allow plugins to use QueryTest in their tests ### What changes were proposed in this pull request? This PR changes `QueryTest` to no longer depend on `RemoteSparkSession`. ### Why are the changes needed? This allows the tests for Spark Connect plugin to provide their version of `RemoteSparkSession` (which depends on some idiosyncrasies of how Spark is built). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests to ensure that nothing breaks. Manually tested that this allows a plugin to use `QueryTest`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46667 from tomvanbussel/SPARK-48341. Lead-authored-by: Tom van Bussel <tom.vanbus...@databricks.com> Co-authored-by: Tom van Bussel <tom.vanbus...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 8 ++++++-- .../scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala | 4 ++-- .../org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala | 4 ++-- .../test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala | 4 ++-- .../src/test/scala/org/apache/spark/sql/StubbingTestSuite.scala | 4 ++-- .../test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala | 4 ++-- .../org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/application/ReplE2ESuite.scala | 4 ++-- .../apache/spark/sql/streaming/ClientStreamingQuerySuite.scala | 4 ++-- .../sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala | 4 ++-- .../test/scala/org/apache/spark/sql/test/ConnectFunSuite.scala | 4 ++-- .../jvm/src/test/scala/org/apache/spark/sql/test/QueryTest.scala | 6 ++++-- .../test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala | 4 ++-- 15 files changed, 36 insertions(+), 30 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala index d646fad00c07..0e3a683d2701 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala @@ -22,11 +22,11 @@ import java.io.{File, FilenameFilter} import org.apache.commons.io.FileUtils import org.apache.spark.SparkException -import org.apache.spark.sql.test.{RemoteSparkSession, SQLHelper} +import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper} import org.apache.spark.sql.types.{DoubleType, LongType, StructType} import org.apache.spark.storage.StorageLevel -class CatalogSuite extends RemoteSparkSession with SQLHelper { +class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelper { test("Database APIs") { val currentDb = spark.catalog.currentDatabase diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala index 299ff7ff4fe3..88281352f247 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala @@ -22,9 +22,9 @@ import java.util.Random import org.scalatest.matchers.must.Matchers._ import org.apache.spark.SparkIllegalArgumentException -import org.apache.spark.sql.test.RemoteSparkSession +import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession} -class ClientDataFrameStatSuite extends RemoteSparkSession { +class ClientDataFrameStatSuite extends ConnectFunSuite with RemoteSparkSession { private def toLetter(i: Int): String = (i + 97).toChar.toString test("approxQuantile") { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 73a2f6d4f88e..255dd7669798 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -40,12 +40,16 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkResult} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SqlApiConf -import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession, SQLHelper} +import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper} import org.apache.spark.sql.test.SparkConnectServerUtils.port import org.apache.spark.sql.types._ import org.apache.spark.util.SparkThreadUtils -class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester { +class ClientE2ETestSuite + extends ConnectFunSuite + with RemoteSparkSession + with SQLHelper + with PrivateMethodTester { test("throw SparkException with null filename in stack trace elements") { withSQLConf("spark.sql.connect.enrichError.enabled" -> "true") { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala index b77e92995624..8a783d880560 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionSuite.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql import scala.jdk.CollectionConverters._ import org.apache.spark.sql.internal.SqlApiConf -import org.apache.spark.sql.test.{QueryTest, SQLHelper} +import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.types.{StringType, StructType} -class DataFrameNaFunctionSuite extends QueryTest with SQLHelper { +class DataFrameNaFunctionSuite extends QueryTest with RemoteSparkSession { private def createDF(): DataFrame = { val sparkSession = spark import sparkSession.implicits._ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala index 91516b0069b2..988774d5eec9 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala @@ -22,7 +22,7 @@ import java.util.Arrays import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} -import org.apache.spark.sql.test.{QueryTest, SQLHelper} +import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.types._ import org.apache.spark.util.SparkSerDeUtils @@ -33,7 +33,7 @@ case class ClickState(id: String, count: Int) /** * All tests in this class requires client UDF artifacts synced with the server. */ -class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { +class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with RemoteSparkSession { lazy val session: SparkSession = spark import session.implicits._ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala index f56085191f87..8110bd9f46a8 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala @@ -26,7 +26,7 @@ import scala.util.{Failure, Success} import org.scalatest.concurrent.Eventually._ import org.apache.spark.SparkException -import org.apache.spark.sql.test.RemoteSparkSession +import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession} import org.apache.spark.util.SparkThreadUtils.awaitResult /** @@ -34,7 +34,7 @@ import org.apache.spark.util.SparkThreadUtils.awaitResult * class, whether explicit or implicit, as it will trigger a UDF deserialization error during * Maven build/test. */ -class SparkSessionE2ESuite extends RemoteSparkSession { +class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { test("interrupt all - background queries, foreground interrupt") { val session = spark diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/StubbingTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/StubbingTestSuite.scala index b9c5888e5cb7..5bcb17672d6a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/StubbingTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/StubbingTestSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql import org.apache.spark.sql.connect.client.ToStub -import org.apache.spark.sql.test.RemoteSparkSession +import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession} -class StubbingTestSuite extends RemoteSparkSession { +class StubbingTestSuite extends ConnectFunSuite with RemoteSparkSession { private def eval[T](f: => T): T = f test("capture of to-be stubbed class") { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala index a76e046db2e3..2578f2c5e18e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala @@ -23,9 +23,9 @@ import scala.util.Properties import org.apache.spark.sql.connect.common.ProtoDataTypes import org.apache.spark.sql.expressions.ScalarUserDefinedFunction -import org.apache.spark.sql.test.RemoteSparkSession +import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession} -class UDFClassLoadingE2ESuite extends RemoteSparkSession { +class UDFClassLoadingE2ESuite extends ConnectFunSuite with RemoteSparkSession { private val scalaVersion = Properties.versionNumberString .split("\\.") diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala index f7ffe7aa1271..02759918bd40 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala @@ -27,13 +27,13 @@ import org.apache.spark.api.java.function._ import org.apache.spark.sql.api.java.UDF2 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder} import org.apache.spark.sql.functions.{col, struct, udf} -import org.apache.spark.sql.test.QueryTest +import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.types.IntegerType /** * All tests in this class requires client UDF defined in this test class synced with the server. */ -class UserDefinedFunctionE2ETestSuite extends QueryTest { +class UserDefinedFunctionE2ETestSuite extends QueryTest with RemoteSparkSession { test("Dataset typed filter") { val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList() assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8)) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 76958f055f2e..d7977fbeb108 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -25,13 +25,13 @@ import scala.util.Properties import org.apache.commons.io.output.ByteArrayOutputStream import org.scalatest.BeforeAndAfterEach -import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession} +import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession} import org.apache.spark.tags.AmmoniteTest import org.apache.spark.util.IvyTestUtils import org.apache.spark.util.MavenUtils.MavenCoordinate @AmmoniteTest -class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { +class ReplE2ESuite extends ConnectFunSuite with RemoteSparkSession with BeforeAndAfterEach { private val executorService = Executors.newSingleThreadExecutor() private val TIMEOUT_SECONDS = 30 diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index 5ea06cb9634a..e6009a967d15 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -33,11 +33,11 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.functions.{col, lit, udf, window} import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} -import org.apache.spark.sql.test.{IntegrationTestUtils, QueryTest, SQLHelper} +import org.apache.spark.sql.test.{IntegrationTestUtils, QueryTest, RemoteSparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.SparkFileUtils -class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging { +class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with Logging { private val testDataPath = Paths .get( diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala index 2fab6e8e3c84..dc74463f1a25 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateStreamingSuite.scala @@ -25,14 +25,14 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append -import org.apache.spark.sql.test.{QueryTest, SQLHelper} +import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} case class ClickEvent(id: String, timestamp: Timestamp) case class ClickState(id: String, count: Int) -class FlatMapGroupsWithStateStreamingSuite extends QueryTest with SQLHelper { +class FlatMapGroupsWithStateStreamingSuite extends QueryTest with RemoteSparkSession { val flatMapGroupsWithStateSchema: StructType = StructType( Array(StructField("id", StringType), StructField("timestamp", TimestampType))) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/ConnectFunSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/ConnectFunSuite.scala index 8d69d91a34f7..f40738b983b3 100755 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/ConnectFunSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/ConnectFunSuite.scala @@ -34,7 +34,7 @@ trait ConnectFunSuite extends AnyFunSuite { // scalastyle:ignore funsuite java.nio.file.Paths.get(sparkHome, first +: more: _*) } - protected val baseResourcePath: Path = { + protected def baseResourcePath: Path = { getWorkspaceFilePath( "connector", "connect", @@ -45,7 +45,7 @@ trait ConnectFunSuite extends AnyFunSuite { // scalastyle:ignore funsuite "resources").toAbsolutePath } - protected val commonResourcePath: Path = { + protected def commonResourcePath: Path = { getWorkspaceFilePath( "connector", "connect", diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/QueryTest.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/QueryTest.scala index 54fc97c50b3e..8837c76b76ae 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/QueryTest.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/QueryTest.scala @@ -21,11 +21,13 @@ import java.util.TimeZone import org.scalatest.Assertions -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.util.SparkStringUtils.sideBySide import org.apache.spark.util.ArrayImplicits._ -abstract class QueryTest extends RemoteSparkSession { +abstract class QueryTest extends ConnectFunSuite with SQLHelper { + + def spark: SparkSession /** * Runs the plan and makes sure the answer matches the expected result. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala index 300de6e9b081..ecc84e841801 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/RemoteSparkSession.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, Suite} import org.apache.spark.SparkBuildInfo import org.apache.spark.sql.SparkSession @@ -204,7 +204,7 @@ object SparkConnectServerUtils { } } -trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { +trait RemoteSparkSession extends BeforeAndAfterAll { self: Suite => import SparkConnectServerUtils._ var spark: SparkSession = _ protected lazy val serverPort: Int = port --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org