This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fec68678c0cd [SPARK-49028][CONNECT][SQL] Create a shared SparkSession
fec68678c0cd is described below
commit fec68678c0cd2d60d6e851f429e60d2f8d057cf2
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Aug 29 08:40:52 2024 -0400
[SPARK-49028][CONNECT][SQL] Create a shared SparkSession
### What changes were proposed in this pull request?
This PR creates a common base class for `SparkSession`.
### Why are the changes needed?
We are creating a unified interface for the Scala SQL Client API for
Classic and Connect.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47894 from hvanhovell/SPARK-49028.
Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 234 ++------------
.../scala/org/apache/spark/sql/api/Dataset.scala | 3 +-
.../org/apache/spark/sql/api/SparkSession.scala | 337 +++++++++++++++++++++
.../scala/org/apache/spark/sql/functions.scala | 2 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 257 ++--------------
5 files changed, 385 insertions(+), 448 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 50985d4373d6..e16ff2e39173 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -16,10 +16,8 @@
*/
package org.apache.spark.sql
-import java.io.Closeable
import java.net.URI
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import scala.jdk.CollectionConverters._
@@ -69,8 +67,7 @@ import org.apache.spark.util.ArrayImplicits._
class SparkSession private[sql] (
private[sql] val client: SparkConnectClient,
private val planIdGenerator: AtomicLong)
- extends Serializable
- with Closeable
+ extends api.SparkSession[Dataset]
with Logging {
private[this] val allocator = new RootAllocator()
@@ -100,35 +97,11 @@ class SparkSession private[sql] (
*/
val conf: RuntimeConfig = new RuntimeConfig(client)
- /**
- * Executes some code block and prints to stdout the time taken to execute
the block. This is
- * available in Scala only and is used primarily for interactive testing and
debugging.
- *
- * @since 3.4.0
- */
- def time[T](f: => T): T = {
- val start = System.nanoTime()
- val ret = f
- val end = System.nanoTime()
- // scalastyle:off println
- println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
- // scalastyle:on println
- ret
- }
-
- /**
- * Returns a `DataFrame` with no rows or columns.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@transient
val emptyDataFrame: DataFrame = emptyDataset(UnboundRowEncoder)
- /**
- * Creates a new [[Dataset]] of type T containing zero elements.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def emptyDataset[T: Encoder]: Dataset[T] = createDataset[T](Nil)
private def createDataset[T](encoder: AgnosticEncoder[T], data:
Iterator[T]): Dataset[T] = {
@@ -151,104 +124,33 @@ class SparkSession private[sql] (
}
}
- /**
- * Creates a `DataFrame` from a local Seq of Product.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame = {
createDataset(ScalaReflection.encoderFor[A], data.iterator).toDF()
}
- /**
- * :: DeveloperApi :: Creates a `DataFrame` from a `java.util.List`
containing [[Row]]s using
- * the given schema. It is important to make sure that the structure of
every [[Row]] of the
- * provided List matches the provided schema. Otherwise, there will be
runtime exception.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def createDataFrame(rows: java.util.List[Row], schema: StructType):
DataFrame = {
createDataset(RowEncoder.encoderFor(schema),
rows.iterator().asScala).toDF()
}
- /**
- * Applies a schema to a List of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
SELECT * queries
- * will return the columns in an undefined order.
- * @since 3.4.0
- */
+ /** @inheritdoc */
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
= {
val encoder =
JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
createDataset(encoder, data.iterator().asScala).toDF()
}
- /**
- * Creates a [[Dataset]] from a local Seq of data of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL
- * representation) that is generally created automatically through implicits
from a
- * `SparkSession`, or can be created explicitly by calling static methods on
[[Encoders]].
- *
- * ==Example==
- *
- * {{{
- *
- * import spark.implicits._
- * case class Person(name: String, age: Long)
- * val data = Seq(Person("Michael", 29), Person("Andy", 30),
Person("Justin", 19))
- * val ds = spark.createDataset(data)
- *
- * ds.show()
- * // +-------+---+
- * // | name|age|
- * // +-------+---+
- * // |Michael| 29|
- * // | Andy| 30|
- * // | Justin| 19|
- * // +-------+---+
- * }}}
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
createDataset(encoderFor[T], data.iterator)
}
- /**
- * Creates a [[Dataset]] from a `java.util.List` of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL
- * representation) that is generally created automatically through implicits
from a
- * `SparkSession`, or can be created explicitly by calling static methods on
[[Encoders]].
- *
- * ==Java Example==
- *
- * {{{
- * List<String> data = Arrays.asList("hello", "world");
- * Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
- * }}}
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def createDataset[T: Encoder](data: java.util.List[T]): Dataset[T] = {
createDataset(data.asScala.toSeq)
}
- /**
- * Executes a SQL query substituting positional parameters by the given
arguments, returning the
- * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not
for SELECT queries.
- *
- * @param sqlText
- * A SQL statement with positional parameters to execute.
- * @param args
- * An array of Java/Scala objects that can be converted to SQL literal
expressions. See <a
- * href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
Supported Data
- * Types</a> for supported value types in Scala/Java. For example: 1,
"Steven",
- * LocalDate.of(2023, 4, 2). A value can be also a `Column` of a literal
or collection
- * constructor functions such as `map()`, `array()`, `struct()`, in that
case it is taken as
- * is.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
@Experimental
def sql(sqlText: String, args: Array[_]): DataFrame = newDataFrame { builder
=>
// Send the SQL once to the server and then check the output.
@@ -273,45 +175,15 @@ class SparkSession private[sql] (
}
}
- /**
- * Executes a SQL query substituting named parameters by the given
arguments, returning the
- * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not
for SELECT queries.
- *
- * @param sqlText
- * A SQL statement with named parameters to execute.
- * @param args
- * A map of parameter names to Java/Scala objects that can be converted to
SQL literal
- * expressions. See <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
- * Supported Data Types</a> for supported value types in Scala/Java. For
example, map keys:
- * "rank", "name", "birthdate"; map values: 1, "Steven",
LocalDate.of(2023, 4, 2). Map value
- * can be also a `Column` of a literal or collection constructor functions
such as `map()`,
- * `array()`, `struct()`, in that case it is taken as is.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@Experimental
def sql(sqlText: String, args: Map[String, Any]): DataFrame = {
sql(sqlText, args.asJava)
}
- /**
- * Executes a SQL query substituting named parameters by the given
arguments, returning the
- * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not
for SELECT queries.
- *
- * @param sqlText
- * A SQL statement with named parameters to execute.
- * @param args
- * A map of parameter names to Java/Scala objects that can be converted to
SQL literal
- * expressions. See <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
- * Supported Data Types</a> for supported value types in Scala/Java. For
example, map keys:
- * "rank", "name", "birthdate"; map values: 1, "Steven",
LocalDate.of(2023, 4, 2). Map value
- * can be also a `Column` of a literal or collection constructor functions
such as `map()`,
- * `array()`, `struct()`, in that case it is taken as is.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@Experimental
- def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame =
newDataFrame {
+ override def sql(sqlText: String, args: java.util.Map[String, Any]):
DataFrame = newDataFrame {
builder =>
// Send the SQL once to the server and then check the output.
val cmd = newCommand(b =>
@@ -335,13 +207,8 @@ class SparkSession private[sql] (
}
}
- /**
- * Executes a SQL query using Spark, returning the result as a `DataFrame`.
This API eagerly
- * runs DDL/DML commands, but not for SELECT queries.
- *
- * @since 3.4.0
- */
- def sql(query: String): DataFrame = {
+ /** @inheritdoc */
+ override def sql(query: String): DataFrame = {
sql(query, Array.empty)
}
@@ -378,83 +245,30 @@ class SparkSession private[sql] (
*/
lazy val catalog: Catalog = new CatalogImpl(this)
- /**
- * Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
- * reading and the returned DataFrame is the batch scan query plan of this
table. If it's a
- * view, the returned DataFrame is simply the query plan of the view, which
can either be a
- * batch or streaming query plan.
- *
- * @param tableName
- * is either a qualified or unqualified name that designates a table or
view. If a database is
- * specified, it identifies the table/view from the database. Otherwise,
it first attempts to
- * find a temporary view with the given name and then match the table/view
from the current
- * database. Note that, the global temporary view database is also valid
here.
- * @since 3.4.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
read.table(tableName)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements in a
- * range from 0 to `end` (exclusive) with step value 1.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def range(end: Long): Dataset[java.lang.Long] = range(0, end)
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements in a
- * range from `start` to `end` (exclusive) with step value 1.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long): Dataset[java.lang.Long] = {
range(start, end, step = 1)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements in a
- * range from `start` to `end` (exclusive) with a step value.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step, None)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements in a
- * range from `start` to `end` (exclusive) with a step value, with partition
number specified.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long, step: Long, numPartitions: Int):
Dataset[java.lang.Long] = {
range(start, end, step, Option(numPartitions))
}
- /**
- * A collection of methods for registering user-defined functions (UDF).
- *
- * The following example registers a Scala closure as UDF:
- * {{{
- * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 +
arg1)
- * }}}
- *
- * The following example registers a UDF in Java:
- * {{{
- * sparkSession.udf().register("myUDF",
- * (Integer arg1, String arg2) -> arg2 + arg1,
- * DataTypes.StringType);
- * }}}
- *
- * @note
- * The user-defined functions must be deterministic. Due to optimization,
duplicate
- * invocations may be eliminated or the function may even be invoked more
times than it is
- * present in the query.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
lazy val udf: UDFRegistration = new UDFRegistration(this)
// scalastyle:off
@@ -473,6 +287,7 @@ class SparkSession private[sql] (
object implicits extends SQLImplicits(this) with Serializable
// scalastyle:on
+ /** @inheritdoc */
def newSession(): SparkSession = {
SparkSession.builder().client(client.copy()).create()
}
@@ -702,13 +517,6 @@ class SparkSession private[sql] (
client.interruptOperation(operationId).getInterruptedIdsList.asScala.toSeq
}
- /**
- * Synonym for `close()`.
- *
- * @since 3.4.0
- */
- def stop(): Unit = close()
-
/**
* Close the [[SparkSession]].
*
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 633881f233c7..5b4ebed12c17 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -119,7 +119,8 @@ import org.apache.spark.util.SparkClassUtils
* @since 1.6.0
*/
@Stable
-abstract class Dataset[T, DS[_] <: Dataset[_, DS]] extends Serializable {
+abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable {
+ def sparkSession: SparkSession[DS]
val encoder: Encoder[T]
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
new file mode 100644
index 000000000000..12a1a1361903
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import scala.concurrent.duration.NANOSECONDS
+import scala.jdk.CollectionConverters._
+import scala.reflect.runtime.universe.TypeTag
+
+import _root_.java.io.Closeable
+import _root_.java.lang
+import _root_.java.util
+
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * The entry point to programming Spark with the Dataset and DataFrame API.
+ *
+ * In environments that this has been created upfront (e.g. REPL, notebooks),
use the builder
+ * to get an existing session:
+ *
+ * {{{
+ * SparkSession.builder().getOrCreate()
+ * }}}
+ *
+ * The builder can also be used to create a new session:
+ *
+ * {{{
+ * SparkSession.builder
+ * .master("local")
+ * .appName("Word Count")
+ * .config("spark.some.config.option", "some-value")
+ * .getOrCreate()
+ * }}}
+ */
+abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with
Closeable {
+ /**
+ * The version of Spark on which this application is running.
+ *
+ * @since 2.0.0
+ */
+ def version: String
+
+ /**
+ * A collection of methods for registering user-defined functions (UDF).
+ *
+ * The following example registers a Scala closure as UDF:
+ * {{{
+ * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 +
arg1)
+ * }}}
+ *
+ * The following example registers a UDF in Java:
+ * {{{
+ * sparkSession.udf().register("myUDF",
+ * (Integer arg1, String arg2) -> arg2 + arg1,
+ * DataTypes.StringType);
+ * }}}
+ *
+ * @note The user-defined functions must be deterministic. Due to
optimization,
+ * duplicate invocations may be eliminated or the function may even be
invoked more times
+ * than it is present in the query.
+ * @since 2.0.0
+ */
+ def udf: UDFRegistration
+
+ /**
+ * Start a new session with isolated SQL configurations, temporary tables,
registered
+ * functions are isolated, but sharing the underlying `SparkContext` and
cached data.
+ *
+ * @note Other than the `SparkContext`, all shared state is initialized
lazily.
+ * This method will force the initialization of the shared state to
ensure that parent
+ * and child sessions are set up with the same shared state. If the
underlying catalog
+ * implementation is Hive, this will initialize the metastore, which
may take some time.
+ * @since 2.0.0
+ */
+ def newSession(): SparkSession[DS]
+
+ /* --------------------------------- *
+ | Methods for creating DataFrames |
+ * --------------------------------- */
+
+ /**
+ * Returns a `DataFrame` with no rows or columns.
+ *
+ * @since 2.0.0
+ */
+ @transient
+ def emptyDataFrame: DS[Row]
+
+ /**
+ * Creates a `DataFrame` from a local Seq of Product.
+ *
+ * @since 2.0.0
+ */
+ def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DS[Row]
+
+ /**
+ * :: DeveloperApi ::
+ * Creates a `DataFrame` from a `java.util.List` containing
[[org.apache.spark.sql.Row]]s using
+ * the given schema.It is important to make sure that the structure of every
+ * [[org.apache.spark.sql.Row]] of the provided List matches the provided
schema. Otherwise,
+ * there will be runtime exception.
+ *
+ * @since 2.0.0
+ */
+ @DeveloperApi
+ def createDataFrame(rows: util.List[Row], schema: StructType): DS[Row]
+
+ /**
+ * Applies a schema to a List of Java Beans.
+ *
+ * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+ * SELECT * queries will return the columns in an undefined order.
+ *
+ * @since 1.6.0
+ */
+ def createDataFrame(data: util.List[_], beanClass: Class[_]): DS[Row]
+
+ /* ------------------------------- *
+ | Methods for creating DataSets |
+ * ------------------------------- */
+
+ /**
+ * Creates a new [[Dataset]] of type T containing zero elements.
+ *
+ * @since 2.0.0
+ */
+ def emptyDataset[T: Encoder]: DS[T]
+
+ /**
+ * Creates a [[Dataset]] from a local Seq of data of a given type. This
method requires an
+ * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
+ * that is generally created automatically through implicits from a
`SparkSession`, or can be
+ * created explicitly by calling static methods on `Encoders`.
+ *
+ * == Example ==
+ *
+ * {{{
+ *
+ * import spark.implicits._
+ * case class Person(name: String, age: Long)
+ * val data = Seq(Person("Michael", 29), Person("Andy", 30),
Person("Justin", 19))
+ * val ds = spark.createDataset(data)
+ *
+ * ds.show()
+ * // +-------+---+
+ * // | name|age|
+ * // +-------+---+
+ * // |Michael| 29|
+ * // | Andy| 30|
+ * // | Justin| 19|
+ * // +-------+---+
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ def createDataset[T: Encoder](data: Seq[T]): DS[T]
+
+ /**
+ * Creates a [[Dataset]] from a `java.util.List` of a given type. This
method requires an
+ * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
+ * that is generally created automatically through implicits from a
`SparkSession`, or can be
+ * created explicitly by calling static methods on `Encoders`.
+ *
+ * == Java Example ==
+ *
+ * {{{
+ * List<String> data = Arrays.asList("hello", "world");
+ * Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ def createDataset[T: Encoder](data: util.List[T]): DS[T]
+
+ /**
+ * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
+ * in a range from 0 to `end` (exclusive) with step value 1.
+ *
+ * @since 2.0.0
+ */
+ def range(end: Long): DS[lang.Long]
+
+ /**
+ * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
+ * in a range from `start` to `end` (exclusive) with step value 1.
+ *
+ * @since 2.0.0
+ */
+ def range(start: Long, end: Long): DS[lang.Long]
+
+ /**
+ * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
+ * in a range from `start` to `end` (exclusive) with a step value.
+ *
+ * @since 2.0.0
+ */
+ def range(start: Long, end: Long, step: Long): DS[lang.Long]
+
+ /**
+ * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
+ * in a range from `start` to `end` (exclusive) with a step value, with
partition number
+ * specified.
+ *
+ * @since 2.0.0
+ */
+ def range(start: Long, end: Long, step: Long, numPartitions: Int):
DS[lang.Long]
+
+ /* ------------------------- *
+ | Catalog-related methods |
+ * ------------------------- */
+
+ /**
+ * Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
+ * reading and the returned DataFrame is the batch scan query plan of this
table. If it's a view,
+ * the returned DataFrame is simply the query plan of the view, which can
either be a batch or
+ * streaming query plan.
+ *
+ * @param tableName is either a qualified or unqualified name that
designates a table or view.
+ * If a database is specified, it identifies the table/view
from the database.
+ * Otherwise, it first attempts to find a temporary view
with the given name
+ * and then match the table/view from the current database.
+ * Note that, the global temporary view database is also
valid here.
+ * @since 2.0.0
+ */
+ def table(tableName: String): DS[Row]
+
+ /* ----------------- *
+ | Everything else |
+ * ----------------- */
+
+ /**
+ * Executes a SQL query substituting positional parameters by the given
arguments,
+ * returning the result as a `DataFrame`.
+ * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+ *
+ * @param sqlText A SQL statement with positional parameters to execute.
+ * @param args An array of Java/Scala objects that can be converted to
+ * SQL literal expressions. See
+ * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
+ * Supported Data Types</a> for supported value types in
Scala/Java.
+ * For example, 1, "Steven", LocalDate.of(2023, 4, 2).
+ * A value can be also a `Column` of a literal or collection
constructor functions
+ * such as `map()`, `array()`, `struct()`, in that case it is
taken as is.
+ * @since 3.5.0
+ */
+ @Experimental
+ def sql(sqlText: String, args: Array[_]): DS[Row]
+
+ /**
+ * Executes a SQL query substituting named parameters by the given arguments,
+ * returning the result as a `DataFrame`.
+ * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+ *
+ * @param sqlText A SQL statement with named parameters to execute.
+ * @param args A map of parameter names to Java/Scala objects that can be
converted to
+ * SQL literal expressions. See
+ * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
+ * Supported Data Types</a> for supported value types in
Scala/Java.
+ * For example, map keys: "rank", "name", "birthdate";
+ * map values: 1, "Steven", LocalDate.of(2023, 4, 2).
+ * Map value can be also a `Column` of a literal or
collection constructor
+ * functions such as `map()`, `array()`, `struct()`, in that
case it is taken
+ * as is.
+ * @since 3.4.0
+ */
+ @Experimental
+ def sql(sqlText: String, args: Map[String, Any]): DS[Row]
+
+ /**
+ * Executes a SQL query substituting named parameters by the given arguments,
+ * returning the result as a `DataFrame`.
+ * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+ *
+ * @param sqlText A SQL statement with named parameters to execute.
+ * @param args A map of parameter names to Java/Scala objects that can be
converted to
+ * SQL literal expressions. See
+ * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
+ * Supported Data Types</a> for supported value types in
Scala/Java.
+ * For example, map keys: "rank", "name", "birthdate";
+ * map values: 1, "Steven", LocalDate.of(2023, 4, 2).
+ * Map value can be also a `Column` of a literal or
collection constructor
+ * functions such as `map()`, `array()`, `struct()`, in that
case it is taken
+ * as is.
+ * @since 3.4.0
+ */
+ @Experimental
+ def sql(sqlText: String, args: util.Map[String, Any]): DS[Row] = {
+ sql(sqlText, args.asScala.toMap)
+ }
+
+ /**
+ * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+ * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+ *
+ * @since 2.0.0
+ */
+ def sql(sqlText: String): DS[Row] = sql(sqlText, Map.empty[String, Any])
+
+ /**
+ * Executes some code block and prints to stdout the time taken to execute
the block. This is
+ * available in Scala only and is used primarily for interactive testing and
debugging.
+ *
+ * @since 2.1.0
+ */
+ def time[T](f: => T): T = {
+ val start = System.nanoTime()
+ val ret = f
+ val end = System.nanoTime()
+ // scalastyle:off println
+ println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
+ // scalastyle:on println
+ ret
+ }
+
+ /**
+ * Synonym for `close()`.
+ *
+ * @since 2.0.0
+ */
+ def stop(): Unit = close()
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 6749ef9eb1b1..e46d6c95b31a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1702,7 +1702,7 @@ object functions {
* @group normal_funcs
* @since 1.5.0
*/
- def broadcast[DS[_] <: api.Dataset[_, DS]](df: DS[_]): df.type = {
+ def broadcast[DS[U] <: api.Dataset[U, DS]](df: DS[_]): df.type = {
df.hint("broadcast").asInstanceOf[df.type]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a236577cba43..358541c942f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -17,9 +17,7 @@
package org.apache.spark.sql
-import java.io.Closeable
import java.util.{ServiceLoader, UUID}
-import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.jdk.CollectionConverters._
@@ -93,7 +91,7 @@ class SparkSession private(
@transient private val parentSessionState: Option[SessionState],
@transient private[sql] val extensions: SparkSessionExtensions,
@transient private[sql] val initialSessionOptions: Map[String, String])
- extends Serializable with Closeable with Logging { self =>
+ extends api.SparkSession[Dataset] with Logging { self =>
// The call site where this SparkSession was constructed.
private val creationSite: CallSite = Utils.getCallSite()
@@ -122,11 +120,7 @@ class SparkSession private(
.getOrElse(SQLConf.getFallbackConf)
})
- /**
- * The version of Spark on which this application is running.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def version: String = SPARK_VERSION
/* ----------------------- *
@@ -207,27 +201,7 @@ class SparkSession private(
@Unstable
def experimental: ExperimentalMethods = sessionState.experimentalMethods
- /**
- * A collection of methods for registering user-defined functions (UDF).
- *
- * The following example registers a Scala closure as UDF:
- * {{{
- * sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 +
arg1)
- * }}}
- *
- * The following example registers a UDF in Java:
- * {{{
- * sparkSession.udf().register("myUDF",
- * (Integer arg1, String arg2) -> arg2 + arg1,
- * DataTypes.StringType);
- * }}}
- *
- * @note The user-defined functions must be deterministic. Due to
optimization,
- * duplicate invocations may be eliminated or the function may even be
invoked more times than
- * it is present in the query.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def udf: UDFRegistration = sessionState.udfRegistration
private[sql] def udtf: UDTFRegistration = sessionState.udtfRegistration
@@ -260,17 +234,7 @@ class SparkSession private(
@Unstable
private[sql] def artifactManager: ArtifactManager =
sessionState.artifactManager
- /**
- * Start a new session with isolated SQL configurations, temporary tables,
registered
- * functions are isolated, but sharing the underlying `SparkContext` and
cached data.
- *
- * @note Other than the `SparkContext`, all shared state is initialized
lazily.
- * This method will force the initialization of the shared state to ensure
that parent
- * and child sessions are set up with the same shared state. If the
underlying catalog
- * implementation is Hive, this will initialize the metastore, which may
take some time.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def newSession(): SparkSession = {
new SparkSession(
sparkContext,
@@ -308,19 +272,11 @@ class SparkSession private(
| Methods for creating DataFrames |
* --------------------------------- */
- /**
- * Returns a `DataFrame` with no rows or columns.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@transient
lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation())
- /**
- * Creates a new [[Dataset]] of type T containing zero elements.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def emptyDataset[T: Encoder]: Dataset[T] = {
val encoder = implicitly[Encoder[T]]
new Dataset(self, LocalRelation(encoder.schema), encoder)
@@ -336,11 +292,7 @@ class SparkSession private(
Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
}
- /**
- * Creates a `DataFrame` from a local Seq of Product.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame =
withActive {
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = toAttributes(schema)
@@ -403,14 +355,7 @@ class SparkSession private(
createDataFrame(rowRDD.rdd, replaced)
}
- /**
- * :: DeveloperApi ::
- * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using
the given schema.
- * It is important to make sure that the structure of every [[Row]] of the
provided List matches
- * the provided schema. Otherwise, there will be runtime exception.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
@DeveloperApi
def createDataFrame(rows: java.util.List[Row], schema: StructType):
DataFrame = withActive {
val replaced =
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
@@ -447,13 +392,7 @@ class SparkSession private(
createDataFrame(rdd.rdd, beanClass)
}
- /**
- * Applies a schema to a List of Java Beans.
- *
- * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
- * SELECT * queries will return the columns in an undefined order.
- * @since 1.6.0
- */
+ /** @inheritdoc */
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
= withActive {
val attrSeq = getSchema(beanClass)
val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass,
attrSeq)
@@ -473,33 +412,7 @@ class SparkSession private(
| Methods for creating DataSets |
* ------------------------------- */
- /**
- * Creates a [[Dataset]] from a local Seq of data of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * == Example ==
- *
- * {{{
- *
- * import spark.implicits._
- * case class Person(name: String, age: Long)
- * val data = Seq(Person("Michael", 29), Person("Andy", 30),
Person("Justin", 19))
- * val ds = spark.createDataset(data)
- *
- * ds.show()
- * // +-------+---+
- * // | name|age|
- * // +-------+---+
- * // |Michael| 29|
- * // | Andy| 30|
- * // | Justin| 19|
- * // +-------+---+
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
val enc = encoderFor[T]
val toRow = enc.createSerializer()
@@ -521,60 +434,25 @@ class SparkSession private(
Dataset[T](self, ExternalRDD(data, self))
}
- /**
- * Creates a [[Dataset]] from a `java.util.List` of a given type. This
method requires an
- * encoder (to convert a JVM object of type `T` to and from the internal
Spark SQL representation)
- * that is generally created automatically through implicits from a
`SparkSession`, or can be
- * created explicitly by calling static methods on [[Encoders]].
- *
- * == Java Example ==
- *
- * {{{
- * List<String> data = Arrays.asList("hello", "world");
- * Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
- * }}}
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
createDataset(data.asScala.toSeq)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
- * in a range from 0 to `end` (exclusive) with step value 1.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def range(end: Long): Dataset[java.lang.Long] = range(0, end)
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with step value 1.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long): Dataset[java.lang.Long] = {
range(start, end, step = 1, numPartitions = leafNodeDefaultParallelism)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with a step value.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step, numPartitions = leafNodeDefaultParallelism)
}
- /**
- * Creates a [[Dataset]] with a single `LongType` column named `id`,
containing elements
- * in a range from `start` to `end` (exclusive) with a step value, with
partition number
- * specified.
- *
- * @since 2.0.0
- */
+ /** @inheritdoc */
def range(start: Long, end: Long, step: Long, numPartitions: Int):
Dataset[java.lang.Long] = {
new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG)
}
@@ -608,19 +486,7 @@ class SparkSession private(
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)
- /**
- * Returns the specified table/view as a `DataFrame`. If it's a table, it
must support batch
- * reading and the returned DataFrame is the batch scan query plan of this
table. If it's a view,
- * the returned DataFrame is simply the query plan of the view, which can
either be a batch or
- * streaming query plan.
- *
- * @param tableName is either a qualified or unqualified name that
designates a table or view.
- * If a database is specified, it identifies the table/view
from the database.
- * Otherwise, it first attempts to find a temporary view
with the given name
- * and then match the table/view from the current database.
- * Note that, the global temporary view database is also
valid here.
- * @since 2.0.0
- */
+ /** @inheritdoc */
def table(tableName: String): DataFrame = {
read.table(tableName)
}
@@ -661,22 +527,7 @@ class SparkSession private(
Dataset.ofRows(self, plan, tracker)
}
- /**
- * Executes a SQL query substituting positional parameters by the given
arguments,
- * returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
- *
- * @param sqlText A SQL statement with positional parameters to execute.
- * @param args An array of Java/Scala objects that can be converted to
- * SQL literal expressions. See
- * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
- * Supported Data Types</a> for supported value types in
Scala/Java.
- * For example, 1, "Steven", LocalDate.of(2023, 4, 2).
- * A value can be also a `Column` of a literal or collection
constructor functions
- * such as `map()`, `array()`, `struct()`, in that case it is
taken as is.
- *
- * @since 3.5.0
- */
+ /** @inheritdoc */
@Experimental
def sql(sqlText: String, args: Array[_]): DataFrame = {
sql(sqlText, args, new QueryPlanningTracker)
@@ -714,57 +565,20 @@ class SparkSession private(
Dataset.ofRows(self, plan, tracker)
}
- /**
- * Executes a SQL query substituting named parameters by the given arguments,
- * returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
- *
- * @param sqlText A SQL statement with named parameters to execute.
- * @param args A map of parameter names to Java/Scala objects that can be
converted to
- * SQL literal expressions. See
- * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
- * Supported Data Types</a> for supported value types in
Scala/Java.
- * For example, map keys: "rank", "name", "birthdate";
- * map values: 1, "Steven", LocalDate.of(2023, 4, 2).
- * Map value can be also a `Column` of a literal or collection
constructor functions
- * such as `map()`, `array()`, `struct()`, in that case it is
taken as is.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@Experimental
def sql(sqlText: String, args: Map[String, Any]): DataFrame = {
sql(sqlText, args, new QueryPlanningTracker)
}
- /**
- * Executes a SQL query substituting named parameters by the given arguments,
- * returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
- *
- * @param sqlText A SQL statement with named parameters to execute.
- * @param args A map of parameter names to Java/Scala objects that can be
converted to
- * SQL literal expressions. See
- * <a
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">
- * Supported Data Types</a> for supported value types in
Scala/Java.
- * For example, map keys: "rank", "name", "birthdate";
- * map values: 1, "Steven", LocalDate.of(2023, 4, 2).
- * Map value can be also a `Column` of a literal or collection
constructor functions
- * such as `map()`, `array()`, `struct()`, in that case it is
taken as is.
- *
- * @since 3.4.0
- */
+ /** @inheritdoc */
@Experimental
- def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = {
+ override def sql(sqlText: String, args: java.util.Map[String, Any]):
DataFrame = {
sql(sqlText, args.asScala.toMap)
}
- /**
- * Executes a SQL query using Spark, returning the result as a `DataFrame`.
- * This API eagerly runs DDL/DML commands, but not for SELECT queries.
- *
- * @since 2.0.0
- */
- def sql(sqlText: String): DataFrame = sql(sqlText, Map.empty[String, Any])
+ /** @inheritdoc */
+ override def sql(sqlText: String): DataFrame = sql(sqlText,
Map.empty[String, Any])
/**
* Execute an arbitrary string command inside an external execution engine
rather than Spark.
@@ -817,22 +631,6 @@ class SparkSession private(
*/
def readStream: DataStreamReader = new DataStreamReader(self)
- /**
- * Executes some code block and prints to stdout the time taken to execute
the block. This is
- * available in Scala only and is used primarily for interactive testing and
debugging.
- *
- * @since 2.1.0
- */
- def time[T](f: => T): T = {
- val start = System.nanoTime()
- val ret = f
- val end = System.nanoTime()
- // scalastyle:off println
- println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
- // scalastyle:on println
- ret
- }
-
// scalastyle:off
// Disable style checker so "implicits" object can start with lowercase i
/**
@@ -854,19 +652,12 @@ class SparkSession private(
/**
* Stop the underlying `SparkContext`.
*
- * @since 2.0.0
+ * @since 2.1.0
*/
- def stop(): Unit = {
+ override def close(): Unit = {
sparkContext.stop()
}
- /**
- * Synonym for `stop()`.
- *
- * @since 2.1.0
- */
- override def close(): Unit = stop()
-
/**
* Parses the data type in our internal string representation. The data type
string should
* have the same format as the one generated by `toString` in scala.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]