This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 655061aaf728 [SPARK-50102][SQL][CONNECT] Add shims need for missing
public SQL methods
655061aaf728 is described below
commit 655061aaf728231bdd881edbb721202c4a618fb8
Author: Herman van Hovell <[email protected]>
AuthorDate: Sun Nov 10 17:59:08 2024 -0400
[SPARK-50102][SQL][CONNECT] Add shims need for missing public SQL methods
### What changes were proposed in this pull request?
This PR makes the following changes:
- It adds shims for a couple of a number of classes exposed in the
(classic) SQL interface: `BaseRelation` , `ExperimentalMethods`,
`ExecutionListenerManager`, `SharedState`, `SessionState`, `SparkConf`,
`SparkSessionExtensions`, `QueryExecution`, and `SQLContext`.
- It adds all public methods involving these classes. For classic they will
just work like before. For connect they will throw
`SparkUnsupportedOperationExceptions` when used.
- It reduces the visibility of a couple of classes added recently:
`DataSourceRegistration`, and `UDTFRegistration`.
- I have also reorganized all the shims into a single class.
Please note that this is by no means reflects the final state:
- We intent to support `SQLContext`.
- We intent to support 'SparkSession.executeCommand`.
- We are thinking about removing `ExperimentalMethods`, `SharedState`,
`SessionState` from the public interface.
- For `QueryExecution`, and `ExecutionListenerManager` we are considering
adding a plan representation similar that is not tied to Catalyst.
### Why are the changes needed?
We are creating a shared Scala (JVM) SQL interface for both Classic and
Connect.
### Does this PR introduce _any_ user-facing change?
It adds unusable public methods to the connect interface.
### How was this patch tested?
I have added tests that checks if the connect client throws the proper
exceptions.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48687 from hvanhovell/SPARK-50102.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 58 ++++++++++
.../org/apache/spark/sql/DataFrameReader.scala | 5 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 9 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 59 ++++++++--
.../connect/ConnectClientUnsupportedErrors.scala | 59 ++++++++++
.../main/scala/org/apache/spark/sql/package.scala | 3 -
.../spark/sql/UnsupportedFeaturesSuite.scala | 119 ++++++++++++++++++++
.../CheckConnectJvmClientCompatibility.scala | 80 +------------
project/SparkBuild.scala | 1 +
.../scala/org/apache/spark/sql/api/Dataset.scala | 5 +-
.../org/apache/spark/sql/api/SparkSession.scala | 124 ++++++++++++++++++++-
.../scala/org/apache/spark/api/java/shims.scala | 19 ----
.../main/scala/org/apache/spark/rdd/shims.scala | 19 ----
.../src/main/scala/org/apache/spark/shims.scala | 31 ++++++
.../apache/spark/sql/DataSourceRegistration.scala | 2 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 1 -
.../apache/spark/sql/KeyValueGroupedDataset.scala | 2 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 82 ++------------
.../org/apache/spark/sql/UDTFRegistration.scala | 2 +-
19 files changed, 473 insertions(+), 207 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 31c10f9b9aac..7ef6feae0845 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4930,6 +4930,64 @@
},
"sqlState" : "0A000"
},
+ "UNSUPPORTED_CONNECT_FEATURE" : {
+ "message" : [
+ "Feature is not supported in Spark Connect:"
+ ],
+ "subClass" : {
+ "DATASET_QUERY_EXECUTION" : {
+ "message" : [
+ "Access to the Dataset Query Execution. This is server side
developer API."
+ ]
+ },
+ "RDD" : {
+ "message" : [
+ "Resilient Distributed Datasets (RDDs)."
+ ]
+ },
+ "SESSION_BASE_RELATION_TO_DATAFRAME" : {
+ "message" : [
+ "Invoking SparkSession 'baseRelationToDataFrame'. This is server
side developer API"
+ ]
+ },
+ "SESSION_EXECUTE_COMMAND" : {
+ "message" : [
+ "Invoking SparkSession 'executeCommand'."
+ ]
+ },
+ "SESSION_EXPERIMENTAL_METHODS" : {
+ "message" : [
+ "Access to SparkSession Experimental (methods). This is server side
developer API"
+ ]
+ },
+ "SESSION_LISTENER_MANAGER" : {
+ "message" : [
+ "Access to the SparkSession Listener Manager. This is server side
developer API"
+ ]
+ },
+ "SESSION_SESSION_STATE" : {
+ "message" : [
+ "Access to the SparkSession Session State. This is server side
developer API."
+ ]
+ },
+ "SESSION_SHARED_STATE" : {
+ "message" : [
+ "Access to the SparkSession Shared State. This is server side
developer API."
+ ]
+ },
+ "SESSION_SPARK_CONTEXT" : {
+ "message" : [
+ "Access to the SparkContext."
+ ]
+ },
+ "SESSION_SQL_CONTEXT" : {
+ "message" : [
+ "Access to the SparkSession SQL Context."
+ ]
+ }
+ },
+ "sqlState" : "0A000"
+ },
"UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY" : {
"message" : [
"Unsupported data source type for direct query on files:
<dataSourceType>"
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 051d382c4977..1fbc887901ec 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.types.StructType
@@ -144,11 +145,11 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends api.Data
/** @inheritdoc */
override def json(jsonRDD: JavaRDD[String]): Dataset[Row] =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def json(jsonRDD: RDD[String]): Dataset[Row] =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def csv(path: String): DataFrame = super.csv(path)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index adbfda969150..5e50e34e8c35 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -34,10 +34,12 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
import org.apache.spark.sql.catalyst.expressions.OrderUtils
+import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.ConnectConversions._
import org.apache.spark.sql.connect.client.SparkResult
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter,
StorageLevelProtoConverter}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions.{struct, to_json}
import org.apache.spark.sql.internal.{ColumnNodeToProtoConverter,
DataFrameWriterImpl, DataFrameWriterV2Impl, MergeIntoWriterImpl, ToScalaUDF,
UDFAdaptors, UnresolvedAttribute, UnresolvedRegex}
@@ -1478,8 +1480,11 @@ class Dataset[T] private[sql] (
super.groupByKey(func, encoder).asInstanceOf[KeyValueGroupedDataset[K, T]]
/** @inheritdoc */
- override def rdd: RDD[T] = throwRddNotSupportedException()
+ override def rdd: RDD[T] = throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
- override def toJavaRDD: JavaRDD[T] = throwRddNotSupportedException()
+ override def toJavaRDD: JavaRDD[T] = throw
ConnectClientUnsupportedErrors.rdd()
+
+ override def queryExecution: QueryExecution =
+ throw ConnectClientUnsupportedErrors.queryExecution()
}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 366a9bc3b559..7edb1f51f11b 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,7 +29,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import io.grpc.ClientInterceptor
import org.apache.arrow.memory.RootAllocator
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.connect.proto
@@ -40,15 +40,18 @@ import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder}
import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{agnosticEncoderFor,
BoxedLongEncoder, UnboundRowEncoder}
+import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
import org.apache.spark.sql.connect.client.{ClassFinder, CloseableIterator,
SparkConnectClient, SparkResult}
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.connect.client.arrow.ArrowSerializer
import org.apache.spark.sql.functions.lit
-import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig,
SessionCleaner, SqlApiConf}
+import org.apache.spark.sql.internal.{CatalogImpl, ConnectRuntimeConfig,
SessionCleaner, SessionState, SharedState, SqlApiConf}
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.{toExpr,
toTypedExpr}
+import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.ArrayImplicits._
/**
@@ -93,7 +96,7 @@ class SparkSession private[sql] (
/** @inheritdoc */
override def sparkContext: SparkContext =
- throw new UnsupportedOperationException("sparkContext is not supported in
Spark Connect.")
+ throw ConnectClientUnsupportedErrors.sparkContext()
/** @inheritdoc */
val conf: RuntimeConfig = new ConnectRuntimeConfig(client)
@@ -153,27 +156,58 @@ class SparkSession private[sql] (
/** @inheritdoc */
override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): DataFrame =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def createDataFrame(rowRDD: RDD[Row], schema: StructType):
DataFrame =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType):
DataFrame =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]):
DataFrame =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
/** @inheritdoc */
override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] =
- throwRddNotSupportedException()
+ throw ConnectClientUnsupportedErrors.rdd()
+
+ /** @inheritdoc */
+ override def sharedState: SharedState =
+ throw ConnectClientUnsupportedErrors.sharedState()
+
+ /** @inheritdoc */
+ override def sessionState: SessionState =
+ throw ConnectClientUnsupportedErrors.sessionState()
+
+ /** @inheritdoc */
+ override def sqlContext: SQLContext =
+ throw ConnectClientUnsupportedErrors.sqlContext()
+
+ /** @inheritdoc */
+ override def listenerManager: ExecutionListenerManager =
+ throw ConnectClientUnsupportedErrors.listenerManager()
+
+ /** @inheritdoc */
+ override def experimental: ExperimentalMethods =
+ throw ConnectClientUnsupportedErrors.experimental()
+
+ /** @inheritdoc */
+ override def baseRelationToDataFrame(baseRelation: BaseRelation):
api.Dataset[Row] =
+ throw ConnectClientUnsupportedErrors.baseRelationToDataFrame()
+
+ /** @inheritdoc */
+ override def executeCommand(
+ runner: String,
+ command: String,
+ options: Map[String, String]): DataFrame =
+ throw ConnectClientUnsupportedErrors.executeCommand()
/** @inheritdoc */
@Experimental
@@ -663,6 +697,9 @@ object SparkSession extends api.BaseSparkSessionCompanion
with Logging {
/** @inheritdoc */
override def config(map: java.util.Map[String, Any]): this.type =
super.config(map)
+ /** @inheritdoc */
+ override def config(conf: SparkConf): Builder.this.type =
super.config(conf)
+
/** @inheritdoc */
@deprecated("enableHiveSupport does not work in Spark Connect")
override def enableHiveSupport(): this.type = this
@@ -675,6 +712,10 @@ object SparkSession extends api.BaseSparkSessionCompanion
with Logging {
@deprecated("appName does not work in Spark Connect")
override def appName(name: String): this.type = this
+ /** @inheritdoc */
+ @deprecated("withExtensions does not work in Spark Connect")
+ override def withExtensions(f: SparkSessionExtensions => Unit): this.type
= this
+
private def tryCreateSessionFromClient(): Option[SparkSession] = {
if (client != null && client.isSessionValid) {
Option(new SparkSession(client, planIdGenerator))
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
new file mode 100644
index 000000000000..e73bcb8a0059
--- /dev/null
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect
+
+import org.apache.spark.SparkUnsupportedOperationException
+
+private[sql] object ConnectClientUnsupportedErrors {
+
+ private def unsupportedFeatureException(
+ subclass: String): SparkUnsupportedOperationException = {
+ new SparkUnsupportedOperationException(
+ "UNSUPPORTED_CONNECT_FEATURE." + subclass,
+ Map.empty[String, String])
+ }
+
+ def rdd(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("RDD")
+
+ def queryExecution(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("DATASET_QUERY_EXECUTION")
+
+ def executeCommand(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_EXECUTE_COMMAND")
+
+ def baseRelationToDataFrame(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_BASE_RELATION_TO_DATAFRAME")
+
+ def experimental(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_EXPERIMENTAL_METHODS")
+
+ def listenerManager(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_LISTENER_MANAGER")
+
+ def sessionState(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_SESSION_STATE")
+
+ def sharedState(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_SHARED_STATE")
+
+ def sparkContext(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_SPARK_CONTEXT")
+
+ def sqlContext(): SparkUnsupportedOperationException =
+ unsupportedFeatureException("SESSION_SQL_CONTEXT")
+}
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
index 5c61b9371f37..ada94b76fcbc 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/package.scala
@@ -19,7 +19,4 @@ package org.apache.spark
package object sql {
type DataFrame = Dataset[Row]
-
- private[sql] def throwRddNotSupportedException(): Nothing =
- throw new UnsupportedOperationException("RDDs are not supported in Spark
Connect.")
}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
new file mode 100644
index 000000000000..6a26cf581751
--- /dev/null
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.test.ConnectFunSuite
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Test suite that test the errors thrown when using unsupported features.
+ */
+class UnsupportedFeaturesSuite extends ConnectFunSuite {
+ private def session = SparkSession.builder().getOrCreate()
+
+ private def testUnsupportedFeature(name: String, errorCode: String)(
+ f: SparkSession => Any): Unit = {
+ test(name) {
+ val e = intercept[SparkUnsupportedOperationException](f(session))
+ assert(e.getCondition == "UNSUPPORTED_CONNECT_FEATURE." + errorCode)
+ }
+ }
+
+ testUnsupportedFeature("SparkSession.createDataFrame(RDD)", "RDD") { session
=>
+ session.createDataFrame(new RDD[(Int, Int)])
+ }
+
+ testUnsupportedFeature("SparkSession.createDataFrame(RDD, StructType)",
"RDD") { session =>
+ val schema = new StructType().add("_1", "int").add("_2", "int")
+ session.createDataFrame(new RDD[Row], schema)
+ }
+
+ testUnsupportedFeature("SparkSession.createDataFrame(JavaRDD, StructType)",
"RDD") { session =>
+ val schema = new StructType().add("_1", "int").add("_2", "int")
+ session.createDataFrame(new JavaRDD[Row], schema)
+ }
+
+ testUnsupportedFeature("SparkSession.createDataFrame(RDD, Class)", "RDD") {
session =>
+ session.createDataFrame(new RDD[Int], classOf[Int])
+ }
+
+ testUnsupportedFeature("SparkSession.createDataFrame(JavaRDD, Class)",
"RDD") { session =>
+ session.createDataFrame(new JavaRDD[Int], classOf[Int])
+ }
+
+ testUnsupportedFeature("SparkSession.createDataset(RDD)", "RDD") { session =>
+ session.createDataset(new RDD[Int])(Encoders.scalaInt)
+ }
+
+ testUnsupportedFeature("SparkSession.experimental",
"SESSION_EXPERIMENTAL_METHODS") {
+ _.experimental
+ }
+
+ testUnsupportedFeature("SparkSession.sessionState", "SESSION_SESSION_STATE")
{
+ _.sessionState
+ }
+
+ testUnsupportedFeature("SparkSession.sharedState", "SESSION_SHARED_STATE") {
+ _.sharedState
+ }
+
+ testUnsupportedFeature("SparkSession.listenerManager",
"SESSION_LISTENER_MANAGER") {
+ _.listenerManager
+ }
+
+ testUnsupportedFeature("SparkSession.sqlContext", "SESSION_SQL_CONTEXT") {
+ _.sqlContext
+ }
+
+ testUnsupportedFeature(
+ "SparkSession.baseRelationToDataFrame",
+ "SESSION_BASE_RELATION_TO_DATAFRAME") {
+ _.baseRelationToDataFrame(new BaseRelation)
+ }
+
+ testUnsupportedFeature("SparkSession.executeCommand",
"SESSION_EXECUTE_COMMAND") {
+ _.executeCommand("ds", "exec", Map.empty)
+ }
+
+ testUnsupportedFeature("Dataset.queryExecution", "DATASET_QUERY_EXECUTION") {
+ _.range(1).queryExecution
+ }
+
+ testUnsupportedFeature("Dataset.rdd", "RDD") {
+ _.range(1).rdd
+ }
+
+ testUnsupportedFeature("Dataset.javaRDD", "RDD") {
+ _.range(1).javaRDD
+ }
+
+ testUnsupportedFeature("Dataset.toJavaRDD", "RDD") {
+ _.range(1).toJavaRDD
+ }
+
+ testUnsupportedFeature("DataFrameReader.json(RDD)", "RDD") {
+ _.read.json(new RDD[String])
+ }
+
+ testUnsupportedFeature("DataFrameReader.json(JavaRDD)", "RDD") {
+ _.read.json(new JavaRDD[String])
+ }
+}
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index ff3e674137a5..d9ff8d9122ea 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -186,9 +186,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDTFRegistration"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataSourceRegistration"),
- // DataFrame Reader & Writer
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
// rdd
-
// DataFrameNaFunctions
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.fillValue"),
@@ -201,16 +198,7 @@ object CheckConnectJvmClientCompatibility {
// This is due to a potential bug in Mima that all methods in `class
Dataset` are not being
// checked for problems due to the presence of a private[sql] companion
object.
// Further investigation is needed.
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
// protected
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
-
- // KeyValueGroupedDataset
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.KeyValueGroupedDataset.queryExecution"),
// RelationalGroupedDataset
ProblemFilters.exclude[MissingClassProblem](
@@ -218,36 +206,11 @@ object CheckConnectJvmClientCompatibility {
),
// SparkSession
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.dataSource"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataFrame"),
- ProblemFilters.exclude[Problem](
+ ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.canUseSession"),
- // SparkSession#implicits
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.SparkSession#implicits.session"),
-
- // SparkSession#Builder
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.SparkSession#Builder.config"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.SparkSession#Builder.withExtensions"),
-
- // RuntimeConfig
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.RuntimeConfig.sqlConf"),
-
// DataStreamWriter
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.DataStreamWriter$"),
@@ -261,10 +224,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.TestGroupState$"),
- // SQLImplicits
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.session"),
-
// Artifact Manager, client has a totally different implementation.
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.ArtifactManager"),
@@ -278,14 +237,6 @@ object CheckConnectJvmClientCompatibility {
"org.apache.spark.sql.SparkSession.Converter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSession$Converter$"),
- // Missing expressions.
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.expressions.ReduceAggregator"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.expressions.ReduceAggregator$"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.javalang.*"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.scalalang.*"),
-
// UDFRegistration
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.UDFRegistration.register"),
@@ -332,9 +283,7 @@ object CheckConnectJvmClientCompatibility {
val excludeRules = Seq(
// Skipped packages
ProblemFilters.exclude[Problem]("org.apache.spark.sql.application.*"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.connect.*"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.internal.*"),
// private[sql]
@@ -350,7 +299,6 @@ object CheckConnectJvmClientCompatibility {
"org.apache.spark.sql.SparkSession#RichColumn.expr"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession#RichColumn.typedExpr"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.package.column"),
// New public APIs added in the client
// Dataset
@@ -360,14 +308,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.Dataset.collectResult"),
- // RuntimeConfig
- ProblemFilters.exclude[MissingTypesProblem](
- "org.apache.spark.sql.RuntimeConfig" // Client version extends Logging
- ),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.RuntimeConfig.*" // Mute missing Logging methods
- ),
-
// SparkSession
// developer API
ProblemFilters.exclude[DirectMissingMethodProblem](
@@ -379,16 +319,14 @@ object CheckConnectJvmClientCompatibility {
// Experimental
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.registerClassFinder"),
+ ProblemFilters.exclude[IncompatibleSignatureProblem](
+ "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
// SparkSession#Builder
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.SparkSession#Builder.remote"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession#Builder.client"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession#Builder.build" // deprecated
),
- ProblemFilters.exclude[DirectMissingMethodProblem](
- "org.apache.spark.sql.SparkSession#Builder.create"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession#Builder.interceptor"),
@@ -396,23 +334,13 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.session"),
// Steaming API
- ProblemFilters.exclude[MissingTypesProblem](
- "org.apache.spark.sql.streaming.DataStreamWriter" // Client version
extends Logging
- ),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.streaming.DataStreamWriter.*" // Mute missing
Logging methods
- ),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.RemoteStreamingQuery"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.RemoteStreamingQuery$"),
// Skip client side listener specific class
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.streaming.StreamingQueryListenerBus"),
-
- // Encoders are in the wrong JAR
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders"),
-
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.Encoders$"))
+ "org.apache.spark.sql.streaming.StreamingQueryListenerBus"))
checkMiMaCompatibility(sqlJar, clientJar, includedRules, excludeRules)
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d9ec66044adc..0c1b1afbdc1c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1373,6 +1373,7 @@ trait SharedUnidocSettings {
.map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka-clients-0\.10.*""")))
.map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka_2\..*-0\.10.*""")))
.map(_.filterNot(_.data.getCanonicalPath.contains("apache-rat")))
+ .map(_.filterNot(_.data.getCanonicalPath.contains("connect-shims")))
}
val unidocSourceBase = settingKey[String]("Base URL of source links in
Scaladoc.")
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index d6442930d1c5..416f89ba6f09 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -21,11 +21,12 @@ import scala.reflect.runtime.universe.TypeTag
import _root_.java.util
-import org.apache.spark.annotation.{DeveloperApi, Stable}
+import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.{FilterFunction, FlatMapFunction,
ForeachFunction, ForeachPartitionFunction, MapFunction, MapPartitionsFunction,
ReduceFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions, AnalysisException, Column,
DataFrameWriter, DataFrameWriterV2, Encoder, MergeIntoWriter, Observation, Row,
TypedColumn}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.internal.{ToScalaUDF, UDFAdaptors}
import org.apache.spark.sql.types.{Metadata, StructType}
import org.apache.spark.storage.StorageLevel
@@ -128,6 +129,8 @@ abstract class Dataset[T] extends Serializable {
val encoder: Encoder[T]
+ @DeveloperApi @Unstable def queryExecution: QueryExecution
+
/**
* Converts this strongly typed collection of data to generic Dataframe. In
contrast to the
* strongly typed objects that Dataset operations work on, a Dataframe
returns generic
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index cb8c2a66ad28..64b0a87c573d 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -26,12 +26,15 @@ import _root_.java.net.URI
import _root_.java.util
import _root_.java.util.concurrent.atomic.AtomicReference
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable,
Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, Row, RuntimeConfig}
+import org.apache.spark.sql.{Encoder, ExperimentalMethods, Row, RuntimeConfig,
SparkSessionExtensions, SQLContext}
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.SparkClassUtils
/**
@@ -71,6 +74,49 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def version: String
+ /* ----------------------- *
+ | Session-related state |
+ * ----------------------- */
+
+ /**
+ * State shared across sessions, including the `SparkContext`, cached data,
listener, and a
+ * catalog that interacts with external systems.
+ *
+ * This is internal to Spark and there is no guarantee on interface
stability.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.2.0
+ */
+ @Unstable
+ @transient
+ def sharedState: SharedState
+
+ /**
+ * State isolated across sessions, including SQL configurations, temporary
tables, registered
+ * functions, and everything else that accepts a
`org.apache.spark.sql.internal.SQLConf`. If
+ * `parentSessionState` is not null, the `SessionState` will be a copy of
the parent.
+ *
+ * This is internal to Spark and there is no guarantee on interface
stability.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.2.0
+ */
+ @Unstable
+ @transient
+ def sessionState: SessionState
+
+ /**
+ * A wrapped version of this session in the form of a `SQLContext`, for
backward compatibility.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ @transient
+ def sqlContext: SQLContext
+
/**
* Runtime configuration interface for Spark.
*
@@ -82,6 +128,28 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def conf: RuntimeConfig
+ /**
+ * An interface to register custom
`org.apache.spark.sql.util.QueryExecutionListeners` that
+ * listen for execution metrics.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ def listenerManager: ExecutionListenerManager
+
+ /**
+ * :: Experimental :: A collection of methods that are considered
experimental, but can be used
+ * to hook into the query planner for advanced functionality.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ @Experimental
+ @Unstable
+ def experimental: ExperimentalMethods
+
/**
* A collection of methods for registering user-defined functions (UDF).
*
@@ -246,6 +314,15 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row]
+ /**
+ * Convert a `BaseRelation` created for external data sources into a
`DataFrame`.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.0.0
+ */
+ def baseRelationToDataFrame(baseRelation: BaseRelation): Dataset[Row]
+
/* ------------------------------- *
| Methods for creating DataSets |
* ------------------------------- */
@@ -442,6 +519,29 @@ abstract class SparkSession extends Serializable with
Closeable {
*/
def sql(sqlText: String): Dataset[Row] = sql(sqlText, Map.empty[String, Any])
+ /**
+ * Execute an arbitrary string command inside an external execution engine
rather than Spark.
+ * This could be useful when user wants to execute some commands out of
Spark. For example,
+ * executing custom DDL/DML command for JDBC, creating index for
ElasticSearch, creating cores
+ * for Solr and so on.
+ *
+ * The command will be eagerly executed after this method is called and the
returned DataFrame
+ * will contain the output of the command(if any).
+ *
+ * @param runner
+ * The class name of the runner that implements `ExternalCommandRunner`.
+ * @param command
+ * The target command to be executed
+ * @param options
+ * The options for the runner.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 3.0.0
+ */
+ @Unstable
+ def executeCommand(runner: String, command: String, options: Map[String,
String]): Dataset[Row]
+
/**
* Add a single artifact to the current session.
*
@@ -985,6 +1085,26 @@ abstract class SparkSessionBuilder {
config(map.asScala.toMap)
}
+ /**
+ * Sets a list of config options based on the given `SparkConf`.
+ *
+ * @since 2.0.0
+ */
+ def config(conf: SparkConf): this.type = synchronized {
+ conf.getAll.foreach { case (k, v) => options += k -> v }
+ this
+ }
+
+ /**
+ * Inject extensions into the [[SparkSession]]. This allows a user to add
Analyzer rules,
+ * Optimizer rules, Planning Strategies or a customized parser.
+ *
+ * @note
+ * this method is not supported in Spark Connect.
+ * @since 2.2.0
+ */
+ def withExtensions(f: SparkSessionExtensions => Unit): this.type
+
/**
* Gets an existing [[SparkSession]] or, if there is no existing one,
creates a new one based on
* the options set in this builder.
diff --git
a/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala
b/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala
deleted file mode 100644
index 45fae0024748..000000000000
--- a/sql/connect/shims/src/main/scala/org/apache/spark/api/java/shims.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.api.java
-
-class JavaRDD[T]
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala
b/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala
deleted file mode 100644
index b23f83fa9185..000000000000
--- a/sql/connect/shims/src/main/scala/org/apache/spark/rdd/shims.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.rdd
-
-class RDD[T]
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
index 813b8e4859c2..ad8771a03b28 100644
--- a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
@@ -17,3 +17,34 @@
package org.apache.spark
class SparkContext
+class SparkConf {
+ def getAll: Array[(String, String)] = Array.empty
+}
+
+package api.java {
+ class JavaRDD[T]
+}
+
+package rdd {
+ class RDD[T]
+}
+
+package sql {
+ class ExperimentalMethods
+ class SparkSessionExtensions
+ class SQLContext
+
+ package execution {
+ class QueryExecution
+ }
+ package internal {
+ class SharedState
+ class SessionState
+ }
+ package util {
+ class ExecutionListenerManager
+ }
+ package sources {
+ class BaseRelation
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala
index 3b64cb97e10b..9d763edb079e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
* Use `SparkSession.dataSource` to access this.
*/
@Evolving
-class DataSourceRegistration private[sql] (dataSourceManager:
DataSourceManager)
+private[sql] class DataSourceRegistration private[sql] (dataSourceManager:
DataSourceManager)
extends Logging {
protected[sql] def registerPython(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3953a5c3704f..d43274d761af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -217,7 +217,6 @@ class Dataset[T] private[sql](
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends api.Dataset[T] {
type DS[U] = Dataset[U]
- type RGD = RelationalGroupedDataset
@transient lazy val sparkSession: SparkSession = {
if (queryExecution == null || queryExecution.sparkSession == null) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index c645ba57e8f8..392c3edab989 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.streaming.{GroupState,
GroupStateTimeout, OutputMode
class KeyValueGroupedDataset[K, V] private[sql](
kEncoder: Encoder[K],
vEncoder: Encoder[V],
- @transient val queryExecution: QueryExecution,
+ @transient private[sql] val queryExecution: QueryExecution,
private val dataAttributes: Seq[Attribute],
private val groupingAttributes: Seq[Attribute])
extends api.KeyValueGroupedDataset[K, V] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 0c2aa6f941a2..3af4a26cf187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -149,29 +149,14 @@ class SparkSession private(
| Session-related state |
* ----------------------- */
- /**
- * State shared across sessions, including the `SparkContext`, cached data,
listener,
- * and a catalog that interacts with external systems.
- *
- * This is internal to Spark and there is no guarantee on interface
stability.
- *
- * @since 2.2.0
- */
+ /** @inheritdoc */
@Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext,
initialSessionOptions))
}
- /**
- * State isolated across sessions, including SQL configurations, temporary
tables, registered
- * functions, and everything else that accepts a
[[org.apache.spark.sql.internal.SQLConf]].
- * If `parentSessionState` is not null, the `SessionState` will be a copy of
the parent.
- *
- * This is internal to Spark and there is no guarantee on interface
stability.
- *
- * @since 2.2.0
- */
+ /** @inheritdoc */
@Unstable
@transient
lazy val sessionState: SessionState = {
@@ -185,32 +170,17 @@ class SparkSession private(
}
}
- /**
- * A wrapped version of this session in the form of a [[SQLContext]], for
backward compatibility.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@transient
val sqlContext: SQLContext = new SQLContext(this)
/** @inheritdoc */
@transient lazy val conf: RuntimeConfig = new
RuntimeConfigImpl(sessionState.conf)
- /**
- * An interface to register custom
[[org.apache.spark.sql.util.QueryExecutionListener]]s
- * that listen for execution metrics.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def listenerManager: ExecutionListenerManager = sessionState.listenerManager
- /**
- * :: Experimental ::
- * A collection of methods that are considered experimental, but can be used
to hook into
- * the query planner for advanced functionality.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@Experimental
@Unstable
def experimental: ExperimentalMethods = sessionState.experimentalMethods
@@ -227,7 +197,7 @@ class SparkSession private(
*/
@Experimental
@Unstable
- def dataSource: DataSourceRegistration = sessionState.dataSourceRegistration
+ private[sql] def dataSource: DataSourceRegistration =
sessionState.dataSourceRegistration
/** @inheritdoc */
@Unstable
@@ -357,11 +327,7 @@ class SparkSession private(
Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq))
}
- /**
- * Convert a `BaseRelation` created for external data sources into a
`DataFrame`.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}
@@ -527,19 +493,7 @@ class SparkSession private(
override def sql(sqlText: String): DataFrame = sql(sqlText,
Map.empty[String, Any])
/**
- * Execute an arbitrary string command inside an external execution engine
rather than Spark.
- * This could be useful when user wants to execute some commands out of
Spark. For
- * example, executing custom DDL/DML command for JDBC, creating index for
ElasticSearch,
- * creating cores for Solr and so on.
- *
- * The command will be eagerly executed after this method is called and the
returned
- * DataFrame will contain the output of the command(if any).
- *
- * @param runner The class name of the runner that implements
`ExternalCommandRunner`.
- * @param command The target command to be executed
- * @param options The options for the runner.
- *
- * @since 3.0.0
+ * @inheritdoc
*/
@Unstable
def executeCommand(runner: String, command: String, options: Map[String,
String]): DataFrame = {
@@ -826,15 +780,8 @@ object SparkSession extends api.BaseSparkSessionCompanion
with Logging {
/** @inheritdoc */
override def config(map: java.util.Map[String, Any]): this.type =
super.config(map)
- /**
- * Sets a list of config options based on the given `SparkConf`.
- *
- * @since 2.0.0
- */
- def config(conf: SparkConf): this.type = synchronized {
- conf.getAll.foreach { case (k, v) => options += k -> v }
- this
- }
+ /** @inheritdoc */
+ override def config(conf: SparkConf): this.type = super.config(conf)
/** @inheritdoc */
override def master(master: String): this.type = super.master(master)
@@ -850,13 +797,8 @@ object SparkSession extends api.BaseSparkSessionCompanion
with Logging {
}
}
- /**
- * Inject extensions into the [[SparkSession]]. This allows a user to add
Analyzer rules,
- * Optimizer rules, Planning Strategies or a customized parser.
- *
- * @since 2.2.0
- */
- def withExtensions(f: SparkSessionExtensions => Unit): this.type =
synchronized {
+ /** @inheritdoc */
+ override def withExtensions(f: SparkSessionExtensions => Unit): this.type
= synchronized {
f(extensions)
this
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
index b1666f247581..06f103c0d69a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.execution.python.UserDefinedPythonTableFunction
* @since 3.5.0
*/
@Evolving
-class UDTFRegistration private[sql] (tableFunctionRegistry:
TableFunctionRegistry)
+private[sql] class UDTFRegistration private[sql] (tableFunctionRegistry:
TableFunctionRegistry)
extends Logging {
protected[sql] def registerPython(name: String, udtf:
UserDefinedPythonTableFunction): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]