This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fec68678c0cd [SPARK-49028][CONNECT][SQL] Create a shared SparkSession
fec68678c0cd is described below

commit fec68678c0cd2d60d6e851f429e60d2f8d057cf2
Author: Herman van Hovell <[email protected]>
AuthorDate: Thu Aug 29 08:40:52 2024 -0400

    [SPARK-49028][CONNECT][SQL] Create a shared SparkSession
    
    ### What changes were proposed in this pull request?
    This PR creates a common base class for `SparkSession`.
    
    ### Why are the changes needed?
    We are creating a unified interface for the Scala SQL Client API for 
Classic and Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47894 from hvanhovell/SPARK-49028.
    
    Authored-by: Herman van Hovell <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 234 ++------------
 .../scala/org/apache/spark/sql/api/Dataset.scala   |   3 +-
 .../org/apache/spark/sql/api/SparkSession.scala    | 337 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     |   2 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  | 257 ++--------------
 5 files changed, 385 insertions(+), 448 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 50985d4373d6..e16ff2e39173 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.spark.sql
 
-import java.io.Closeable
 import java.net.URI
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 
 import scala.jdk.CollectionConverters._
@@ -69,8 +67,7 @@ import org.apache.spark.util.ArrayImplicits._
 class SparkSession private[sql] (
     private[sql] val client: SparkConnectClient,
     private val planIdGenerator: AtomicLong)
-    extends Serializable
-    with Closeable
+    extends api.SparkSession[Dataset]
     with Logging {
 
   private[this] val allocator = new RootAllocator()
@@ -100,35 +97,11 @@ class SparkSession private[sql] (
    */
   val conf: RuntimeConfig = new RuntimeConfig(client)
 
-  /**
-   * Executes some code block and prints to stdout the time taken to execute 
the block. This is
-   * available in Scala only and is used primarily for interactive testing and 
debugging.
-   *
-   * @since 3.4.0
-   */
-  def time[T](f: => T): T = {
-    val start = System.nanoTime()
-    val ret = f
-    val end = System.nanoTime()
-    // scalastyle:off println
-    println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
-    // scalastyle:on println
-    ret
-  }
-
-  /**
-   * Returns a `DataFrame` with no rows or columns.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @transient
   val emptyDataFrame: DataFrame = emptyDataset(UnboundRowEncoder)
 
-  /**
-   * Creates a new [[Dataset]] of type T containing zero elements.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def emptyDataset[T: Encoder]: Dataset[T] = createDataset[T](Nil)
 
   private def createDataset[T](encoder: AgnosticEncoder[T], data: 
Iterator[T]): Dataset[T] = {
@@ -151,104 +124,33 @@ class SparkSession private[sql] (
     }
   }
 
-  /**
-   * Creates a `DataFrame` from a local Seq of Product.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def createDataFrame[A <: Product: TypeTag](data: Seq[A]): DataFrame = {
     createDataset(ScalaReflection.encoderFor[A], data.iterator).toDF()
   }
 
-  /**
-   * :: DeveloperApi :: Creates a `DataFrame` from a `java.util.List` 
containing [[Row]]s using
-   * the given schema. It is important to make sure that the structure of 
every [[Row]] of the
-   * provided List matches the provided schema. Otherwise, there will be 
runtime exception.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def createDataFrame(rows: java.util.List[Row], schema: StructType): 
DataFrame = {
     createDataset(RowEncoder.encoderFor(schema), 
rows.iterator().asScala).toDF()
   }
 
-  /**
-   * Applies a schema to a List of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, 
SELECT * queries
-   * will return the columns in an undefined order.
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame 
= {
     val encoder = 
JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
     createDataset(encoder, data.iterator().asScala).toDF()
   }
 
-  /**
-   * Creates a [[Dataset]] from a local Seq of data of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL
-   * representation) that is generally created automatically through implicits 
from a
-   * `SparkSession`, or can be created explicitly by calling static methods on 
[[Encoders]].
-   *
-   * ==Example==
-   *
-   * {{{
-   *
-   *   import spark.implicits._
-   *   case class Person(name: String, age: Long)
-   *   val data = Seq(Person("Michael", 29), Person("Andy", 30), 
Person("Justin", 19))
-   *   val ds = spark.createDataset(data)
-   *
-   *   ds.show()
-   *   // +-------+---+
-   *   // |   name|age|
-   *   // +-------+---+
-   *   // |Michael| 29|
-   *   // |   Andy| 30|
-   *   // | Justin| 19|
-   *   // +-------+---+
-   * }}}
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
     createDataset(encoderFor[T], data.iterator)
   }
 
-  /**
-   * Creates a [[Dataset]] from a `java.util.List` of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL
-   * representation) that is generally created automatically through implicits 
from a
-   * `SparkSession`, or can be created explicitly by calling static methods on 
[[Encoders]].
-   *
-   * ==Java Example==
-   *
-   * {{{
-   *     List<String> data = Arrays.asList("hello", "world");
-   *     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
-   * }}}
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def createDataset[T: Encoder](data: java.util.List[T]): Dataset[T] = {
     createDataset(data.asScala.toSeq)
   }
 
-  /**
-   * Executes a SQL query substituting positional parameters by the given 
arguments, returning the
-   * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not 
for SELECT queries.
-   *
-   * @param sqlText
-   *   A SQL statement with positional parameters to execute.
-   * @param args
-   *   An array of Java/Scala objects that can be converted to SQL literal 
expressions. See <a
-   *   href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";> 
Supported Data
-   *   Types</a> for supported value types in Scala/Java. For example: 1, 
"Steven",
-   *   LocalDate.of(2023, 4, 2). A value can be also a `Column` of a literal 
or collection
-   *   constructor functions such as `map()`, `array()`, `struct()`, in that 
case it is taken as
-   *   is.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   @Experimental
   def sql(sqlText: String, args: Array[_]): DataFrame = newDataFrame { builder 
=>
     // Send the SQL once to the server and then check the output.
@@ -273,45 +175,15 @@ class SparkSession private[sql] (
     }
   }
 
-  /**
-   * Executes a SQL query substituting named parameters by the given 
arguments, returning the
-   * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not 
for SELECT queries.
-   *
-   * @param sqlText
-   *   A SQL statement with named parameters to execute.
-   * @param args
-   *   A map of parameter names to Java/Scala objects that can be converted to 
SQL literal
-   *   expressions. See <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
-   *   Supported Data Types</a> for supported value types in Scala/Java. For 
example, map keys:
-   *   "rank", "name", "birthdate"; map values: 1, "Steven", 
LocalDate.of(2023, 4, 2). Map value
-   *   can be also a `Column` of a literal or collection constructor functions 
such as `map()`,
-   *   `array()`, `struct()`, in that case it is taken as is.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @Experimental
   def sql(sqlText: String, args: Map[String, Any]): DataFrame = {
     sql(sqlText, args.asJava)
   }
 
-  /**
-   * Executes a SQL query substituting named parameters by the given 
arguments, returning the
-   * result as a `DataFrame`. This API eagerly runs DDL/DML commands, but not 
for SELECT queries.
-   *
-   * @param sqlText
-   *   A SQL statement with named parameters to execute.
-   * @param args
-   *   A map of parameter names to Java/Scala objects that can be converted to 
SQL literal
-   *   expressions. See <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
-   *   Supported Data Types</a> for supported value types in Scala/Java. For 
example, map keys:
-   *   "rank", "name", "birthdate"; map values: 1, "Steven", 
LocalDate.of(2023, 4, 2). Map value
-   *   can be also a `Column` of a literal or collection constructor functions 
such as `map()`,
-   *   `array()`, `struct()`, in that case it is taken as is.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @Experimental
-  def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = 
newDataFrame {
+  override def sql(sqlText: String, args: java.util.Map[String, Any]): 
DataFrame = newDataFrame {
     builder =>
       // Send the SQL once to the server and then check the output.
       val cmd = newCommand(b =>
@@ -335,13 +207,8 @@ class SparkSession private[sql] (
       }
   }
 
-  /**
-   * Executes a SQL query using Spark, returning the result as a `DataFrame`. 
This API eagerly
-   * runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @since 3.4.0
-   */
-  def sql(query: String): DataFrame = {
+  /** @inheritdoc */
+  override def sql(query: String): DataFrame = {
     sql(query, Array.empty)
   }
 
@@ -378,83 +245,30 @@ class SparkSession private[sql] (
    */
   lazy val catalog: Catalog = new CatalogImpl(this)
 
-  /**
-   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
-   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a
-   * view, the returned DataFrame is simply the query plan of the view, which 
can either be a
-   * batch or streaming query plan.
-   *
-   * @param tableName
-   *   is either a qualified or unqualified name that designates a table or 
view. If a database is
-   *   specified, it identifies the table/view from the database. Otherwise, 
it first attempts to
-   *   find a temporary view with the given name and then match the table/view 
from the current
-   *   database. Note that, the global temporary view database is also valid 
here.
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     read.table(tableName)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements in a
-   * range from 0 to `end` (exclusive) with step value 1.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def range(end: Long): Dataset[java.lang.Long] = range(0, end)
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements in a
-   * range from `start` to `end` (exclusive) with step value 1.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long): Dataset[java.lang.Long] = {
     range(start, end, step = 1)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements in a
-   * range from `start` to `end` (exclusive) with a step value.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
     range(start, end, step, None)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements in a
-   * range from `start` to `end` (exclusive) with a step value, with partition 
number specified.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long, step: Long, numPartitions: Int): 
Dataset[java.lang.Long] = {
     range(start, end, step, Option(numPartitions))
   }
 
-  /**
-   * A collection of methods for registering user-defined functions (UDF).
-   *
-   * The following example registers a Scala closure as UDF:
-   * {{{
-   *   sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + 
arg1)
-   * }}}
-   *
-   * The following example registers a UDF in Java:
-   * {{{
-   *   sparkSession.udf().register("myUDF",
-   *       (Integer arg1, String arg2) -> arg2 + arg1,
-   *       DataTypes.StringType);
-   * }}}
-   *
-   * @note
-   *   The user-defined functions must be deterministic. Due to optimization, 
duplicate
-   *   invocations may be eliminated or the function may even be invoked more 
times than it is
-   *   present in the query.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   lazy val udf: UDFRegistration = new UDFRegistration(this)
 
   // scalastyle:off
@@ -473,6 +287,7 @@ class SparkSession private[sql] (
   object implicits extends SQLImplicits(this) with Serializable
   // scalastyle:on
 
+  /** @inheritdoc */
   def newSession(): SparkSession = {
     SparkSession.builder().client(client.copy()).create()
   }
@@ -702,13 +517,6 @@ class SparkSession private[sql] (
     client.interruptOperation(operationId).getInterruptedIdsList.asScala.toSeq
   }
 
-  /**
-   * Synonym for `close()`.
-   *
-   * @since 3.4.0
-   */
-  def stop(): Unit = close()
-
   /**
    * Close the [[SparkSession]].
    *
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
index 633881f233c7..5b4ebed12c17 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala
@@ -119,7 +119,8 @@ import org.apache.spark.util.SparkClassUtils
  * @since 1.6.0
  */
 @Stable
-abstract class Dataset[T, DS[_] <: Dataset[_, DS]] extends Serializable {
+abstract class Dataset[T, DS[U] <: Dataset[U, DS]] extends Serializable {
+  def sparkSession: SparkSession[DS]
 
   val encoder: Encoder[T]
 
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
new file mode 100644
index 000000000000..12a1a1361903
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.api
+
+import scala.concurrent.duration.NANOSECONDS
+import scala.jdk.CollectionConverters._
+import scala.reflect.runtime.universe.TypeTag
+
+import _root_.java.io.Closeable
+import _root_.java.lang
+import _root_.java.util
+
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.sql.{Encoder, Row}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * The entry point to programming Spark with the Dataset and DataFrame API.
+ *
+ * In environments that this has been created upfront (e.g. REPL, notebooks), 
use the builder
+ * to get an existing session:
+ *
+ * {{{
+ *   SparkSession.builder().getOrCreate()
+ * }}}
+ *
+ * The builder can also be used to create a new session:
+ *
+ * {{{
+ *   SparkSession.builder
+ *     .master("local")
+ *     .appName("Word Count")
+ *     .config("spark.some.config.option", "some-value")
+ *     .getOrCreate()
+ * }}}
+ */
+abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with 
Closeable {
+  /**
+   * The version of Spark on which this application is running.
+   *
+   * @since 2.0.0
+   */
+  def version: String
+
+  /**
+   * A collection of methods for registering user-defined functions (UDF).
+   *
+   * The following example registers a Scala closure as UDF:
+   * {{{
+   *   sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + 
arg1)
+   * }}}
+   *
+   * The following example registers a UDF in Java:
+   * {{{
+   *   sparkSession.udf().register("myUDF",
+   *       (Integer arg1, String arg2) -> arg2 + arg1,
+   *       DataTypes.StringType);
+   * }}}
+   *
+   * @note The user-defined functions must be deterministic. Due to 
optimization,
+   *       duplicate invocations may be eliminated or the function may even be 
invoked more times
+   *       than it is present in the query.
+   * @since 2.0.0
+   */
+  def udf: UDFRegistration
+
+  /**
+   * Start a new session with isolated SQL configurations, temporary tables, 
registered
+   * functions are isolated, but sharing the underlying `SparkContext` and 
cached data.
+   *
+   * @note Other than the `SparkContext`, all shared state is initialized 
lazily.
+   *       This method will force the initialization of the shared state to 
ensure that parent
+   *       and child sessions are set up with the same shared state. If the 
underlying catalog
+   *       implementation is Hive, this will initialize the metastore, which 
may take some time.
+   * @since 2.0.0
+   */
+  def newSession(): SparkSession[DS]
+
+  /* --------------------------------- *
+   |  Methods for creating DataFrames  |
+   * --------------------------------- */
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 2.0.0
+   */
+  @transient
+  def emptyDataFrame: DS[Row]
+
+  /**
+   * Creates a `DataFrame` from a local Seq of Product.
+   *
+   * @since 2.0.0
+   */
+  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DS[Row]
+
+  /**
+   * :: DeveloperApi ::
+   * Creates a `DataFrame` from a `java.util.List` containing 
[[org.apache.spark.sql.Row]]s using
+   * the given schema.It is important to make sure that the structure of every
+   * [[org.apache.spark.sql.Row]] of the provided List matches the provided 
schema. Otherwise,
+   * there will be runtime exception.
+   *
+   * @since 2.0.0
+   */
+  @DeveloperApi
+  def createDataFrame(rows: util.List[Row], schema: StructType): DS[Row]
+
+  /**
+   * Applies a schema to a List of Java Beans.
+   *
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
+   * SELECT * queries will return the columns in an undefined order.
+   *
+   * @since 1.6.0
+   */
+  def createDataFrame(data: util.List[_], beanClass: Class[_]): DS[Row]
+
+  /* ------------------------------- *
+   |  Methods for creating DataSets  |
+   * ------------------------------- */
+
+  /**
+   * Creates a new [[Dataset]] of type T containing zero elements.
+   *
+   * @since 2.0.0
+   */
+  def emptyDataset[T: Encoder]: DS[T]
+
+  /**
+   * Creates a [[Dataset]] from a local Seq of data of a given type. This 
method requires an
+   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
+   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
+   * created explicitly by calling static methods on `Encoders`.
+   *
+   * == Example ==
+   *
+   * {{{
+   *
+   *   import spark.implicits._
+   *   case class Person(name: String, age: Long)
+   *   val data = Seq(Person("Michael", 29), Person("Andy", 30), 
Person("Justin", 19))
+   *   val ds = spark.createDataset(data)
+   *
+   *   ds.show()
+   *   // +-------+---+
+   *   // |   name|age|
+   *   // +-------+---+
+   *   // |Michael| 29|
+   *   // |   Andy| 30|
+   *   // | Justin| 19|
+   *   // +-------+---+
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def createDataset[T: Encoder](data: Seq[T]): DS[T]
+
+  /**
+   * Creates a [[Dataset]] from a `java.util.List` of a given type. This 
method requires an
+   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
+   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
+   * created explicitly by calling static methods on `Encoders`.
+   *
+   * == Java Example ==
+   *
+   * {{{
+   *     List<String> data = Arrays.asList("hello", "world");
+   *     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def createDataset[T: Encoder](data: util.List[T]): DS[T]
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from 0 to `end` (exclusive) with step value 1.
+   *
+   * @since 2.0.0
+   */
+  def range(end: Long): DS[lang.Long]
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with step value 1.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long): DS[lang.Long]
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long): DS[lang.Long]
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): 
DS[lang.Long]
+
+  /* ------------------------- *
+   |  Catalog-related methods  |
+   * ------------------------- */
+
+  /**
+   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
+   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a view,
+   * the returned DataFrame is simply the query plan of the view, which can 
either be a batch or
+   * streaming query plan.
+   *
+   * @param tableName is either a qualified or unqualified name that 
designates a table or view.
+   *                  If a database is specified, it identifies the table/view 
from the database.
+   *                  Otherwise, it first attempts to find a temporary view 
with the given name
+   *                  and then match the table/view from the current database.
+   *                  Note that, the global temporary view database is also 
valid here.
+   * @since 2.0.0
+   */
+  def table(tableName: String): DS[Row]
+
+  /* ----------------- *
+   |  Everything else  |
+   * ----------------- */
+
+  /**
+   * Executes a SQL query substituting positional parameters by the given 
arguments,
+   * returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @param sqlText A SQL statement with positional parameters to execute.
+   * @param args    An array of Java/Scala objects that can be converted to
+   *                SQL literal expressions. See
+   *                <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
+   *                Supported Data Types</a> for supported value types in 
Scala/Java.
+   *                For example, 1, "Steven", LocalDate.of(2023, 4, 2).
+   *                A value can be also a `Column` of a literal or collection 
constructor functions
+   *                such as `map()`, `array()`, `struct()`, in that case it is 
taken as is.
+   * @since 3.5.0
+   */
+  @Experimental
+  def sql(sqlText: String, args: Array[_]): DS[Row]
+
+  /**
+   * Executes a SQL query substituting named parameters by the given arguments,
+   * returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @param sqlText A SQL statement with named parameters to execute.
+   * @param args    A map of parameter names to Java/Scala objects that can be 
converted to
+   *                SQL literal expressions. See
+   *                <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
+   *                Supported Data Types</a> for supported value types in 
Scala/Java.
+   *                For example, map keys: "rank", "name", "birthdate";
+   *                map values: 1, "Steven", LocalDate.of(2023, 4, 2).
+   *                Map value can be also a `Column` of a literal or 
collection constructor
+   *                functions such as `map()`, `array()`, `struct()`, in that 
case it is taken
+   *                as is.
+   * @since 3.4.0
+   */
+  @Experimental
+  def sql(sqlText: String, args: Map[String, Any]): DS[Row]
+
+  /**
+   * Executes a SQL query substituting named parameters by the given arguments,
+   * returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @param sqlText A SQL statement with named parameters to execute.
+   * @param args    A map of parameter names to Java/Scala objects that can be 
converted to
+   *                SQL literal expressions. See
+   *                <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
+   *                Supported Data Types</a> for supported value types in 
Scala/Java.
+   *                For example, map keys: "rank", "name", "birthdate";
+   *                map values: 1, "Steven", LocalDate.of(2023, 4, 2).
+   *                Map value can be also a `Column` of a literal or 
collection constructor
+   *                functions such as `map()`, `array()`, `struct()`, in that 
case it is taken
+   *                as is.
+   * @since 3.4.0
+   */
+  @Experimental
+  def sql(sqlText: String, args: util.Map[String, Any]): DS[Row] = {
+    sql(sqlText, args.asScala.toMap)
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(sqlText: String): DS[Row] = sql(sqlText, Map.empty[String, Any])
+
+  /**
+   * Executes some code block and prints to stdout the time taken to execute 
the block. This is
+   * available in Scala only and is used primarily for interactive testing and 
debugging.
+   *
+   * @since 2.1.0
+   */
+  def time[T](f: => T): T = {
+    val start = System.nanoTime()
+    val ret = f
+    val end = System.nanoTime()
+    // scalastyle:off println
+    println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
+    // scalastyle:on println
+    ret
+  }
+
+  /**
+   * Synonym for `close()`.
+   *
+   * @since 2.0.0
+   */
+  def stop(): Unit = close()
+}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
index 6749ef9eb1b1..e46d6c95b31a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1702,7 +1702,7 @@ object functions {
    * @group normal_funcs
    * @since 1.5.0
    */
-  def broadcast[DS[_] <: api.Dataset[_, DS]](df: DS[_]): df.type = {
+  def broadcast[DS[U] <: api.Dataset[U, DS]](df: DS[_]): df.type = {
     df.hint("broadcast").asInstanceOf[df.type]
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a236577cba43..358541c942f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.io.Closeable
 import java.util.{ServiceLoader, UUID}
-import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.jdk.CollectionConverters._
@@ -93,7 +91,7 @@ class SparkSession private(
     @transient private val parentSessionState: Option[SessionState],
     @transient private[sql] val extensions: SparkSessionExtensions,
     @transient private[sql] val initialSessionOptions: Map[String, String])
-  extends Serializable with Closeable with Logging { self =>
+  extends api.SparkSession[Dataset] with Logging { self =>
 
   // The call site where this SparkSession was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
@@ -122,11 +120,7 @@ class SparkSession private(
       .getOrElse(SQLConf.getFallbackConf)
   })
 
-  /**
-   * The version of Spark on which this application is running.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def version: String = SPARK_VERSION
 
   /* ----------------------- *
@@ -207,27 +201,7 @@ class SparkSession private(
   @Unstable
   def experimental: ExperimentalMethods = sessionState.experimentalMethods
 
-  /**
-   * A collection of methods for registering user-defined functions (UDF).
-   *
-   * The following example registers a Scala closure as UDF:
-   * {{{
-   *   sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + 
arg1)
-   * }}}
-   *
-   * The following example registers a UDF in Java:
-   * {{{
-   *   sparkSession.udf().register("myUDF",
-   *       (Integer arg1, String arg2) -> arg2 + arg1,
-   *       DataTypes.StringType);
-   * }}}
-   *
-   * @note The user-defined functions must be deterministic. Due to 
optimization,
-   * duplicate invocations may be eliminated or the function may even be 
invoked more times than
-   * it is present in the query.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def udf: UDFRegistration = sessionState.udfRegistration
 
   private[sql] def udtf: UDTFRegistration = sessionState.udtfRegistration
@@ -260,17 +234,7 @@ class SparkSession private(
   @Unstable
   private[sql] def artifactManager: ArtifactManager = 
sessionState.artifactManager
 
-  /**
-   * Start a new session with isolated SQL configurations, temporary tables, 
registered
-   * functions are isolated, but sharing the underlying `SparkContext` and 
cached data.
-   *
-   * @note Other than the `SparkContext`, all shared state is initialized 
lazily.
-   * This method will force the initialization of the shared state to ensure 
that parent
-   * and child sessions are set up with the same shared state. If the 
underlying catalog
-   * implementation is Hive, this will initialize the metastore, which may 
take some time.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def newSession(): SparkSession = {
     new SparkSession(
       sparkContext,
@@ -308,19 +272,11 @@ class SparkSession private(
    |  Methods for creating DataFrames  |
    * --------------------------------- */
 
-  /**
-   * Returns a `DataFrame` with no rows or columns.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @transient
   lazy val emptyDataFrame: DataFrame = Dataset.ofRows(self, LocalRelation())
 
-  /**
-   * Creates a new [[Dataset]] of type T containing zero elements.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def emptyDataset[T: Encoder]: Dataset[T] = {
     val encoder = implicitly[Encoder[T]]
     new Dataset(self, LocalRelation(encoder.schema), encoder)
@@ -336,11 +292,7 @@ class SparkSession private(
     Dataset.ofRows(self, ExternalRDD(rdd, self)(encoder))
   }
 
-  /**
-   * Creates a `DataFrame` from a local Seq of Product.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = 
withActive {
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributeSeq = toAttributes(schema)
@@ -403,14 +355,7 @@ class SparkSession private(
     createDataFrame(rowRDD.rdd, replaced)
   }
 
-  /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using 
the given schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided List matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   @DeveloperApi
   def createDataFrame(rows: java.util.List[Row], schema: StructType): 
DataFrame = withActive {
     val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
@@ -447,13 +392,7 @@ class SparkSession private(
     createDataFrame(rdd.rdd, beanClass)
   }
 
-  /**
-   * Applies a schema to a List of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   * @since 1.6.0
-   */
+  /** @inheritdoc */
   def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame 
= withActive {
     val attrSeq = getSchema(beanClass)
     val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass, 
attrSeq)
@@ -473,33 +412,7 @@ class SparkSession private(
    |  Methods for creating DataSets  |
    * ------------------------------- */
 
-  /**
-   * Creates a [[Dataset]] from a local Seq of data of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
-   *
-   * == Example ==
-   *
-   * {{{
-   *
-   *   import spark.implicits._
-   *   case class Person(name: String, age: Long)
-   *   val data = Seq(Person("Michael", 29), Person("Andy", 30), 
Person("Justin", 19))
-   *   val ds = spark.createDataset(data)
-   *
-   *   ds.show()
-   *   // +-------+---+
-   *   // |   name|age|
-   *   // +-------+---+
-   *   // |Michael| 29|
-   *   // |   Andy| 30|
-   *   // | Justin| 19|
-   *   // +-------+---+
-   * }}}
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
     val enc = encoderFor[T]
     val toRow = enc.createSerializer()
@@ -521,60 +434,25 @@ class SparkSession private(
     Dataset[T](self, ExternalRDD(data, self))
   }
 
-  /**
-   * Creates a [[Dataset]] from a `java.util.List` of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
-   *
-   * == Java Example ==
-   *
-   * {{{
-   *     List<String> data = Arrays.asList("hello", "world");
-   *     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
-   * }}}
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
     createDataset(data.asScala.toSeq)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
-   * in a range from 0 to `end` (exclusive) with step value 1.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def range(end: Long): Dataset[java.lang.Long] = range(0, end)
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with step value 1.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long): Dataset[java.lang.Long] = {
     range(start, end, step = 1, numPartitions = leafNodeDefaultParallelism)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with a step value.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
     range(start, end, step, numPartitions = leafNodeDefaultParallelism)
   }
 
-  /**
-   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
-   * specified.
-   *
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def range(start: Long, end: Long, step: Long, numPartitions: Int): 
Dataset[java.lang.Long] = {
     new Dataset(self, Range(start, end, step, numPartitions), Encoders.LONG)
   }
@@ -608,19 +486,7 @@ class SparkSession private(
    */
   @transient lazy val catalog: Catalog = new CatalogImpl(self)
 
-  /**
-   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
-   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a view,
-   * the returned DataFrame is simply the query plan of the view, which can 
either be a batch or
-   * streaming query plan.
-   *
-   * @param tableName is either a qualified or unqualified name that 
designates a table or view.
-   *                  If a database is specified, it identifies the table/view 
from the database.
-   *                  Otherwise, it first attempts to find a temporary view 
with the given name
-   *                  and then match the table/view from the current database.
-   *                  Note that, the global temporary view database is also 
valid here.
-   * @since 2.0.0
-   */
+  /** @inheritdoc */
   def table(tableName: String): DataFrame = {
     read.table(tableName)
   }
@@ -661,22 +527,7 @@ class SparkSession private(
       Dataset.ofRows(self, plan, tracker)
     }
 
-  /**
-   * Executes a SQL query substituting positional parameters by the given 
arguments,
-   * returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @param sqlText A SQL statement with positional parameters to execute.
-   * @param args An array of Java/Scala objects that can be converted to
-   *             SQL literal expressions. See
-   *             <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
-   *             Supported Data Types</a> for supported value types in 
Scala/Java.
-   *             For example, 1, "Steven", LocalDate.of(2023, 4, 2).
-   *             A value can be also a `Column` of a literal or collection 
constructor functions
-   *             such as `map()`, `array()`, `struct()`, in that case it is 
taken as is.
-   *
-   * @since 3.5.0
-   */
+  /** @inheritdoc */
   @Experimental
   def sql(sqlText: String, args: Array[_]): DataFrame = {
     sql(sqlText, args, new QueryPlanningTracker)
@@ -714,57 +565,20 @@ class SparkSession private(
       Dataset.ofRows(self, plan, tracker)
     }
 
-  /**
-   * Executes a SQL query substituting named parameters by the given arguments,
-   * returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @param sqlText A SQL statement with named parameters to execute.
-   * @param args A map of parameter names to Java/Scala objects that can be 
converted to
-   *             SQL literal expressions. See
-   *             <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
-   *             Supported Data Types</a> for supported value types in 
Scala/Java.
-   *             For example, map keys: "rank", "name", "birthdate";
-   *             map values: 1, "Steven", LocalDate.of(2023, 4, 2).
-   *             Map value can be also a `Column` of a literal or collection 
constructor functions
-   *             such as `map()`, `array()`, `struct()`, in that case it is 
taken as is.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @Experimental
   def sql(sqlText: String, args: Map[String, Any]): DataFrame = {
     sql(sqlText, args, new QueryPlanningTracker)
   }
 
-  /**
-   * Executes a SQL query substituting named parameters by the given arguments,
-   * returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @param sqlText A SQL statement with named parameters to execute.
-   * @param args A map of parameter names to Java/Scala objects that can be 
converted to
-   *             SQL literal expressions. See
-   *             <a 
href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html";>
-   *             Supported Data Types</a> for supported value types in 
Scala/Java.
-   *             For example, map keys: "rank", "name", "birthdate";
-   *             map values: 1, "Steven", LocalDate.of(2023, 4, 2).
-   *             Map value can be also a `Column` of a literal or collection 
constructor functions
-   *             such as `map()`, `array()`, `struct()`, in that case it is 
taken as is.
-   *
-   * @since 3.4.0
-   */
+  /** @inheritdoc */
   @Experimental
-  def sql(sqlText: String, args: java.util.Map[String, Any]): DataFrame = {
+  override def sql(sqlText: String, args: java.util.Map[String, Any]): 
DataFrame = {
     sql(sqlText, args.asScala.toMap)
   }
 
-  /**
-   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @since 2.0.0
-   */
-  def sql(sqlText: String): DataFrame = sql(sqlText, Map.empty[String, Any])
+  /** @inheritdoc */
+  override def sql(sqlText: String): DataFrame = sql(sqlText, 
Map.empty[String, Any])
 
   /**
    * Execute an arbitrary string command inside an external execution engine 
rather than Spark.
@@ -817,22 +631,6 @@ class SparkSession private(
    */
   def readStream: DataStreamReader = new DataStreamReader(self)
 
-  /**
-   * Executes some code block and prints to stdout the time taken to execute 
the block. This is
-   * available in Scala only and is used primarily for interactive testing and 
debugging.
-   *
-   * @since 2.1.0
-   */
-  def time[T](f: => T): T = {
-    val start = System.nanoTime()
-    val ret = f
-    val end = System.nanoTime()
-    // scalastyle:off println
-    println(s"Time taken: ${NANOSECONDS.toMillis(end - start)} ms")
-    // scalastyle:on println
-    ret
-  }
-
   // scalastyle:off
   // Disable style checker so "implicits" object can start with lowercase i
   /**
@@ -854,19 +652,12 @@ class SparkSession private(
   /**
    * Stop the underlying `SparkContext`.
    *
-   * @since 2.0.0
+   * @since 2.1.0
    */
-  def stop(): Unit = {
+  override def close(): Unit = {
     sparkContext.stop()
   }
 
-  /**
-   * Synonym for `stop()`.
-   *
-   * @since 2.1.0
-   */
-  override def close(): Unit = stop()
-
   /**
    * Parses the data type in our internal string representation. The data type 
string should
    * have the same format as the one generated by `toString` in scala.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to