This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6d554a4933 [SPARK-49436][CONNECT][SQL] Common interface for 
SQLContext
7f6d554a4933 is described below

commit 7f6d554a493330744113fa7934236d0dc5a90bc0
Author: Paddy Xu <[email protected]>
AuthorDate: Thu Dec 19 09:03:20 2024 +0900

    [SPARK-49436][CONNECT][SQL] Common interface for SQLContext
    
    ### What changes were proposed in this pull request?
    
    This PR adds an abstraction for `SQLContext` in the `spark-api` package. 
Both sides (Classic and Connect) maintain their own implementation.
    
    ### Why are the changes needed?
    
    To unify the API interface and make `SQLContext` available to Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Connect users are now able to call `sparkSession.sqlContext` and the 
APIs it provides.
    
    ### How was this patch tested?
    
    Not needed. All new methods are mirrored from SparkSession except 
`tables()`, which is covered by existing tests in `ListTablesSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48958 from xupefei/api-sqlcontext.
    
    Authored-by: Paddy Xu <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |    5 -
 .../scala/org/apache/spark/sql/SQLContext.scala    |  336 +++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |    3 +-
 .../connect/ConnectClientUnsupportedErrors.scala   |    3 -
 .../spark/sql/UnsupportedFeaturesSuite.scala       |    4 -
 .../CheckConnectJvmClientCompatibility.scala       |    7 +-
 .../org/apache/spark/sql/api}/SQLContext.scala     |  629 +++++-------
 .../org/apache/spark/sql/api/SparkSession.scala    |    2 +-
 .../src/main/scala/org/apache/spark/shims.scala    |    1 -
 .../scala/org/apache/spark/sql/SQLContext.scala    | 1063 ++++----------------
 .../org/apache/spark/sql/SQLContextSuite.scala     |   23 +
 11 files changed, 847 insertions(+), 1229 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 695f89d741c1..deb62866f072 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5090,11 +5090,6 @@
         "message" : [
           "Access to the SparkContext."
         ]
-      },
-      "SESSION_SQL_CONTEXT" : {
-        "message" : [
-          "Access to the SparkSession SQL Context."
-        ]
       }
     },
     "sqlState" : "0A000"
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
new file mode 100644
index 000000000000..3603eb6ea508
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.{List => JList, Map => JMap, Properties}
+
+import scala.jdk.CollectionConverters.PropertiesHasAsScala
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Stable
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.connect.ConnectClientUnsupportedErrors
+import org.apache.spark.sql.connect.ConnectConversions._
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+@Stable
+class SQLContext private[sql] (override val sparkSession: SparkSession)
+    extends api.SQLContext(sparkSession) {
+
+  /** @inheritdoc */
+  def newSession(): SQLContext = sparkSession.newSession().sqlContext
+
+  /** @inheritdoc */
+  def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
+
+  /** @inheritdoc */
+  def setConf(props: Properties): Unit = sparkSession.conf.synchronized {
+    props.asScala.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+  }
+
+  /** @inheritdoc */
+  def experimental: ExperimentalMethods = sparkSession.experimental
+
+  /** @inheritdoc */
+  def udf: UDFRegistration = sparkSession.udf
+
+  // scalastyle:off
+  // Disable style checker so "implicits" object can start with lowercase i
+
+  /** @inheritdoc */
+  object implicits extends SQLImplicits {
+
+    /** @inheritdoc */
+    override protected def session: SparkSession = sparkSession
+  }
+
+  // scalastyle:on
+
+  /** @inheritdoc */
+  def read: DataFrameReader = sparkSession.read
+
+  /** @inheritdoc */
+  def readStream: DataStreamReader = sparkSession.readStream
+
+  /**
+   * Returns a `StreamingQueryManager` that allows managing all the
+   * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active 
on `this` context.
+   *
+   * @since 4.0.0
+   */
+  def streams: StreamingQueryManager = sparkSession.streams
+
+  /** @inheritdoc */
+  override def sparkContext: SparkContext = {
+    throw ConnectClientUnsupportedErrors.sparkContext()
+  }
+
+  /** @inheritdoc */
+  override def emptyDataFrame: Dataset[Row] = super.emptyDataFrame
+
+  /** @inheritdoc */
+  override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): 
Dataset[Row] =
+    super.createDataFrame(rdd)
+
+  /** @inheritdoc */
+  override def createDataFrame[A <: Product: TypeTag](data: Seq[A]): 
Dataset[Row] =
+    super.createDataFrame(data)
+
+  /** @inheritdoc */
+  override def baseRelationToDataFrame(baseRelation: BaseRelation): 
Dataset[Row] =
+    super.baseRelationToDataFrame(baseRelation)
+
+  /** @inheritdoc */
+  override def createDataFrame(rowRDD: RDD[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rowRDD, schema)
+
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = 
super.createDataset(data)
+
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = 
super.createDataset(data)
+
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: JList[T]): Dataset[T] =
+    super.createDataset(data)
+
+  /** @inheritdoc */
+  override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rowRDD, schema)
+
+  /** @inheritdoc */
+  override def createDataFrame(rows: JList[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rows, schema)
+
+  /** @inheritdoc */
+  override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] 
=
+    super.createDataFrame(rdd, beanClass)
+
+  /** @inheritdoc */
+  override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): 
Dataset[Row] =
+    super.createDataFrame(rdd, beanClass)
+
+  /** @inheritdoc */
+  override def createDataFrame(data: JList[_], beanClass: Class[_]): 
Dataset[Row] =
+    super.createDataFrame(data, beanClass)
+
+  /** @inheritdoc */
+  override def createExternalTable(tableName: String, path: String): 
Dataset[Row] =
+    super.createExternalTable(tableName, path)
+
+  /** @inheritdoc */
+  override def createExternalTable(
+      tableName: String,
+      path: String,
+      source: String): Dataset[Row] = {
+    super.createExternalTable(tableName, path, source)
+  }
+
+  /** @inheritdoc */
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      options: JMap[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, options)
+  }
+
+  /** @inheritdoc */
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, options)
+  }
+
+  /** @inheritdoc */
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: JMap[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, schema, options)
+  }
+
+  /** @inheritdoc */
+  override def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, schema, options)
+  }
+
+  /** @inheritdoc */
+  override def range(end: Long): Dataset[Row] = super.range(end)
+
+  /** @inheritdoc */
+  override def range(start: Long, end: Long): Dataset[Row] = 
super.range(start, end)
+
+  /** @inheritdoc */
+  override def range(start: Long, end: Long, step: Long): Dataset[Row] =
+    super.range(start, end, step)
+
+  /** @inheritdoc */
+  override def range(start: Long, end: Long, step: Long, numPartitions: Int): 
Dataset[Row] =
+    super.range(start, end, step, numPartitions)
+
+  /** @inheritdoc */
+  override def sql(sqlText: String): Dataset[Row] = super.sql(sqlText)
+
+  /** @inheritdoc */
+  override def table(tableName: String): Dataset[Row] = super.table(tableName)
+
+  /** @inheritdoc */
+  override def tables(): Dataset[Row] = super.tables()
+
+  /** @inheritdoc */
+  override def tables(databaseName: String): Dataset[Row] = 
super.tables(databaseName)
+
+  /** @inheritdoc */
+  override def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row] 
=
+    super.applySchema(rowRDD, schema)
+
+  /** @inheritdoc */
+  override def applySchema(rowRDD: JavaRDD[Row], schema: StructType): 
Dataset[Row] =
+    super.applySchema(rowRDD, schema)
+
+  /** @inheritdoc */
+  override def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] =
+    super.applySchema(rdd, beanClass)
+
+  /** @inheritdoc */
+  override def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] 
=
+    super.applySchema(rdd, beanClass)
+
+  /** @inheritdoc */
+  @scala.annotation.varargs
+  override def parquetFile(paths: String*): Dataset[Row] = 
super.parquetFile(paths: _*)
+
+  /** @inheritdoc */
+  override def jsonFile(path: String): Dataset[Row] = super.jsonFile(path)
+
+  /** @inheritdoc */
+  override def jsonFile(path: String, schema: StructType): Dataset[Row] =
+    super.jsonFile(path, schema)
+
+  /** @inheritdoc */
+  override def jsonFile(path: String, samplingRatio: Double): Dataset[Row] =
+    super.jsonFile(path, samplingRatio)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String]): Dataset[Row] = super.jsonRDD(json)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String]): Dataset[Row] = 
super.jsonRDD(json)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] =
+    super.jsonRDD(json, schema)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String], schema: StructType): 
Dataset[Row] =
+    super.jsonRDD(json, schema)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row] 
=
+    super.jsonRDD(json, samplingRatio)
+
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String], samplingRatio: Double): 
Dataset[Row] =
+    super.jsonRDD(json, samplingRatio)
+
+  /** @inheritdoc */
+  override def load(path: String): Dataset[Row] = super.load(path)
+
+  /** @inheritdoc */
+  override def load(path: String, source: String): Dataset[Row] = 
super.load(path, source)
+
+  /** @inheritdoc */
+  override def load(source: String, options: JMap[String, String]): 
Dataset[Row] =
+    super.load(source, options)
+
+  /** @inheritdoc */
+  override def load(source: String, options: Map[String, String]): 
Dataset[Row] =
+    super.load(source, options)
+
+  /** @inheritdoc */
+  override def load(
+      source: String,
+      schema: StructType,
+      options: JMap[String, String]): Dataset[Row] = {
+    super.load(source, schema, options)
+  }
+
+  /** @inheritdoc */
+  override def load(
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): Dataset[Row] = {
+    super.load(source, schema, options)
+  }
+
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String): Dataset[Row] = 
super.jdbc(url, table)
+
+  /** @inheritdoc */
+  override def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int): Dataset[Row] = {
+    super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions)
+  }
+
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String, theParts: Array[String]): 
Dataset[Row] = {
+    super.jdbc(url, table, theParts)
+  }
+}
+object SQLContext extends api.SQLContextCompanion {
+
+  override private[sql] type SQLContextImpl = SQLContext
+  override private[sql] type SparkContextImpl = SparkContext
+
+  /**
+   * Get the singleton SQLContext if it exists or create a new one.
+   *
+   * This function can be used to create a singleton SQLContext object that 
can be shared across
+   * the JVM.
+   *
+   * If there is an active SQLContext for current thread, it will be returned 
instead of the
+   * global one.
+   *
+   * @param sparkContext
+   *   The SparkContext. This parameter is not used in Spark Connect.
+   *
+   * @since 4.0.0
+   */
+  def getOrCreate(sparkContext: SparkContext): SQLContext = {
+    SparkSession.builder().getOrCreate().sqlContext
+  }
+
+  /** @inheritdoc */
+  override def setActive(sqlContext: SQLContext): Unit = 
super.setActive(sqlContext)
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 939a1341a891..b6bba8251913 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -188,8 +188,7 @@ class SparkSession private[sql] (
     throw ConnectClientUnsupportedErrors.sessionState()
 
   /** @inheritdoc */
-  override def sqlContext: SQLContext =
-    throw ConnectClientUnsupportedErrors.sqlContext()
+  override val sqlContext: SQLContext = new SQLContext(this)
 
   /** @inheritdoc */
   override def listenerManager: ExecutionListenerManager =
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
index e73bcb8a0059..5783a20348d7 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/ConnectClientUnsupportedErrors.scala
@@ -53,7 +53,4 @@ private[sql] object ConnectClientUnsupportedErrors {
 
   def sparkContext(): SparkUnsupportedOperationException =
     unsupportedFeatureException("SESSION_SPARK_CONTEXT")
-
-  def sqlContext(): SparkUnsupportedOperationException =
-    unsupportedFeatureException("SESSION_SQL_CONTEXT")
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
index 6a26cf581751..42ae6987c9f3 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UnsupportedFeaturesSuite.scala
@@ -79,10 +79,6 @@ class UnsupportedFeaturesSuite extends ConnectFunSuite {
     _.listenerManager
   }
 
-  testUnsupportedFeature("SparkSession.sqlContext", "SESSION_SQL_CONTEXT") {
-    _.sqlContext
-  }
-
   testUnsupportedFeature(
     "SparkSession.baseRelationToDataFrame",
     "SESSION_BASE_RELATION_TO_DATAFRAME") {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index b5ea973aa1d7..4ec84a4087eb 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -176,8 +176,6 @@ object CheckConnectJvmClientCompatibility {
 
       // Skip unsupported classes
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ExperimentalMethods"),
-      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext"),
-      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext$*"),
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SparkSessionExtensions"),
       ProblemFilters.exclude[MissingClassProblem](
         "org.apache.spark.sql.SparkSessionExtensionsProvider"),
@@ -331,6 +329,11 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[DirectMissingMethodProblem](
         "org.apache.spark.sql.SparkSession#Builder.interceptor"),
 
+      // Private case class in SQLContext
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLContext$ListTableRow"),
+      ProblemFilters.exclude[MissingClassProblem](
+        "org.apache.spark.sql.SQLContext$ListTableRow$"),
+
       // SQLImplicits
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.session"),
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
similarity index 55%
copy from sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
copy to sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
index 636899a7acb0..50590fffa152 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SQLContext.scala
@@ -15,34 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
-
-import java.util.Properties
+package org.apache.spark.sql.api
 
 import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.spark.{SparkConf, SparkContext}
+import _root_.java.util.{List => JList, Map => JMap, Properties}
+
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
-import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, 
UnresolvedNamespace}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ShowTables
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.{Encoder, Encoders, ExperimentalMethods, Row}
+import org.apache.spark.sql.api.SQLImplicits
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.functions.{array_size, coalesce, col, lit, when}
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 /**
  * The entry point for working with structured data (rows and columns) in 
Spark 1.x.
  *
- * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are 
keeping the class
- * here for backward compatibility.
+ * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are 
keeping the class here
+ * for backward compatibility.
  *
  * @groupname basic Basic Operations
  * @groupname ddl_ops Persistent Catalog DDL
@@ -56,47 +53,27 @@ import org.apache.spark.sql.util.ExecutionListenerManager
  * @since 1.0.0
  */
 @Stable
-class SQLContext private[sql](val sparkSession: SparkSession)
-  extends Logging with Serializable {
-
-  self =>
-
-  sparkSession.sparkContext.assertNotStopped()
+abstract class SQLContext private[sql] (val sparkSession: SparkSession)
+    extends Logging
+    with Serializable {
 
   // Note: Since Spark 2.0 this class has become a wrapper of SparkSession, 
where the
   // real functionality resides. This class remains mainly for backward 
compatibility.
 
-  @deprecated("Use SparkSession.builder instead", "2.0.0")
-  def this(sc: SparkContext) = {
-    this(SparkSession.builder().sparkContext(sc).getOrCreate())
-  }
-
-  @deprecated("Use SparkSession.builder instead", "2.0.0")
-  def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
-
-  // TODO: move this logic into SparkSession
-
-  private[sql] def sessionState: SessionState = sparkSession.sessionState
-  private[sql] def sharedState: SharedState = sparkSession.sharedState
-  @deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
-  private[sql] def conf: SQLConf = sessionState.conf
-
   def sparkContext: SparkContext = sparkSession.sparkContext
 
   /**
-   * Returns a [[SQLContext]] as new session, with separated SQL 
configurations, temporary
-   * tables, registered functions, but sharing the same `SparkContext`, cached 
data and
-   * other things.
+   * Returns a [[SQLContext]] as new session, with separated SQL 
configurations, temporary tables,
+   * registered functions, but sharing the same `SparkContext`, cached data 
and other things.
    *
    * @since 1.6.0
    */
-  def newSession(): SQLContext = sparkSession.newSession().sqlContext
+  def newSession(): SQLContext
 
   /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
+   * An interface to register custom QueryExecutionListener that listen for 
execution metrics.
    */
-  def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
+  def listenerManager: ExecutionListenerManager
 
   /**
    * Set Spark SQL configuration properties.
@@ -104,16 +81,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group config
    * @since 1.0.0
    */
-  def setConf(props: Properties): Unit = {
-    sessionState.conf.setConf(props)
-  }
-
-  /**
-   * Set the given Spark SQL configuration property.
-   */
-  private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
-    sessionState.conf.setConf(entry, value)
-  }
+  def setConf(props: Properties): Unit
 
   /**
    * Set the given Spark SQL configuration property.
@@ -147,8 +115,8 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   }
 
   /**
-   * Return all the configuration properties that have been set (i.e. not the 
default).
-   * This creates a new copy of the config properties in the form of a Map.
+   * Return all the configuration properties that have been set (i.e. not the 
default). This
+   * creates a new copy of the config properties in the form of a Map.
    *
    * @group config
    * @since 1.0.0
@@ -158,9 +126,8 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   }
 
   /**
-   * :: Experimental ::
-   * A collection of methods that are considered experimental, but can be used 
to hook into
-   * the query planner for advanced functionality.
+   * :: Experimental :: A collection of methods that are considered 
experimental, but can be used
+   * to hook into the query planner for advanced functionality.
    *
    * @group basic
    * @since 1.3.0
@@ -168,7 +135,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   @Experimental
   @transient
   @Unstable
-  def experimental: ExperimentalMethods = sparkSession.experimental
+  def experimental: ExperimentalMethods
 
   /**
    * Returns a `DataFrame` with no rows or columns.
@@ -176,7 +143,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group basic
    * @since 1.3.0
    */
-  def emptyDataFrame: DataFrame = sparkSession.emptyDataFrame
+  def emptyDataFrame: Dataset[Row] = sparkSession.emptyDataFrame
 
   /**
    * A collection of methods for registering user-defined functions (UDF).
@@ -193,14 +160,29 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    *       DataTypes.StringType);
    * }}}
    *
-   * @note The user-defined functions must be deterministic. Due to 
optimization,
-   * duplicate invocations may be eliminated or the function may even be 
invoked more times than
-   * it is present in the query.
+   * @note
+   *   The user-defined functions must be deterministic. Due to optimization, 
duplicate
+   *   invocations may be eliminated or the function may even be invoked more 
times than it is
+   *   present in the query.
    *
    * @group basic
    * @since 1.3.0
    */
-  def udf: UDFRegistration = sparkSession.udf
+  def udf: UDFRegistration
+
+  /**
+   * (Scala-specific) Implicit methods available in Scala for converting 
common Scala objects into
+   * `DataFrame`s.
+   *
+   * {{{
+   *   val sqlContext = new SQLContext(sc)
+   *   import sqlContext.implicits._
+   * }}}
+   *
+   * @group basic
+   * @since 1.3.0
+   */
+  val implicits: SQLImplicits
 
   /**
    * Returns true if the table is currently cached in-memory.
@@ -237,32 +219,13 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
     sparkSession.catalog.clearCache()
   }
 
-  // scalastyle:off
-  // Disable style checker so "implicits" object can start with lowercase i
-  /**
-   * (Scala-specific) Implicit methods available in Scala for converting
-   * common Scala objects into `DataFrame`s.
-   *
-   * {{{
-   *   val sqlContext = new SQLContext(sc)
-   *   import sqlContext.implicits._
-   * }}}
-   *
-   * @group basic
-   * @since 1.3.0
-   */
-  object implicits extends SQLImplicits {
-    override protected def session: SparkSession = sparkSession
-  }
-  // scalastyle:on
-
   /**
    * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
    *
    * @group dataframes
    * @since 1.3.0
    */
-  def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
+  def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): Dataset[Row] = {
     sparkSession.createDataFrame(rdd)
   }
 
@@ -272,7 +235,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group dataframes
    * @since 1.3.0
    */
-  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
+  def createDataFrame[A <: Product: TypeTag](data: Seq[A]): Dataset[Row] = {
     sparkSession.createDataFrame(data)
   }
 
@@ -282,16 +245,15 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group dataframes
    * @since 1.3.0
    */
-  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
+  def baseRelationToDataFrame(baseRelation: BaseRelation): Dataset[Row] = {
     sparkSession.baseRelationToDataFrame(baseRelation)
   }
 
   /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given 
schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided RDD matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   * Example:
+   * :: DeveloperApi :: Creates a `DataFrame` from an `RDD` containing
+   * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important 
to make sure that
+   * the structure of every [[org.apache.spark.sql.Row Row]] of the provided 
RDD matches the
+   * provided schema. Otherwise, there will be runtime exception. Example:
    * {{{
    *  import org.apache.spark.sql._
    *  import org.apache.spark.sql.types._
@@ -319,17 +281,18 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @since 1.3.0
    */
   @DeveloperApi
-  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+  def createDataFrame(rowRDD: RDD[Row], schema: StructType): Dataset[Row] = {
     sparkSession.createDataFrame(rowRDD, schema)
   }
 
   /**
    * Creates a [[Dataset]] from a local Seq of data of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
+   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL
+   * representation) that is generally created automatically through implicits 
from a
+   * `SparkSession`, or can be created explicitly by calling static methods on
+   * [[org.apache.spark.sql.Encoders Encoders]].
    *
-   * == Example ==
+   * ==Example==
    *
    * {{{
    *
@@ -351,30 +314,30 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @since 2.0.0
    * @group dataset
    */
-  def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
+  def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = {
     sparkSession.createDataset(data)
   }
 
   /**
-   * Creates a [[Dataset]] from an RDD of a given type. This method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
+   * Creates a [[Dataset]] from an RDD of a given type. This method requires 
an encoder (to
+   * convert a JVM object of type `T` to and from the internal Spark SQL 
representation) that is
+   * generally created automatically through implicits from a `SparkSession`, 
or can be created
+   * explicitly by calling static methods on [[org.apache.spark.sql.Encoders 
Encoders]].
    *
    * @since 2.0.0
    * @group dataset
    */
-  def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
+  def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = {
     sparkSession.createDataset(data)
   }
 
   /**
-   * Creates a [[Dataset]] from a `java.util.List` of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
+   * Creates a [[Dataset]] from a `JList` of a given type. This method 
requires an encoder (to
+   * convert a JVM object of type `T` to and from the internal Spark SQL 
representation) that is
+   * generally created automatically through implicits from a `SparkSession`, 
or can be created
+   * explicitly by calling static methods on [[org.apache.spark.sql.Encoders 
Encoders]].
    *
-   * == Java Example ==
+   * ==Java Example==
    *
    * {{{
    *     List<String> data = Arrays.asList("hello", "world");
@@ -384,83 +347,71 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @since 2.0.0
    * @group dataset
    */
-  def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
+  def createDataset[T: Encoder](data: JList[T]): Dataset[T] = {
     sparkSession.createDataset(data)
   }
 
   /**
-   * Creates a DataFrame from an RDD[Row]. User can specify whether the input 
rows should be
-   * converted to Catalyst rows.
-   */
-  private[sql]
-  def internalCreateDataFrame(
-      catalystRows: RDD[InternalRow],
-      schema: StructType,
-      isStreaming: Boolean = false) = {
-    sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
-  }
-
-  /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the 
given schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided RDD matches
-   * the provided schema. Otherwise, there will be runtime exception.
+   * :: DeveloperApi :: Creates a `DataFrame` from a `JavaRDD` containing
+   * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important 
to make sure that
+   * the structure of every [[org.apache.spark.sql.Row Row]] of the provided 
RDD matches the
+   * provided schema. Otherwise, there will be runtime exception.
    *
    * @group dataframes
    * @since 1.3.0
    */
   @DeveloperApi
-  def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+  def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): Dataset[Row] 
= {
     sparkSession.createDataFrame(rowRDD, schema)
   }
 
   /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using 
the given schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided List matches
-   * the provided schema. Otherwise, there will be runtime exception.
+   * :: DeveloperApi :: Creates a `DataFrame` from a `JList` containing
+   * [[org.apache.spark.sql.Row Row]]s using the given schema. It is important 
to make sure that
+   * the structure of every [[org.apache.spark.sql.Row Row]] of the provided 
List matches the
+   * provided schema. Otherwise, there will be runtime exception.
    *
    * @group dataframes
    * @since 1.6.0
    */
   @DeveloperApi
-  def createDataFrame(rows: java.util.List[Row], schema: StructType): 
DataFrame = {
+  def createDataFrame(rows: JList[Row], schema: StructType): Dataset[Row] = {
     sparkSession.createDataFrame(rows, schema)
   }
 
   /**
    * Applies a schema to an RDD of Java Beans.
    *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, 
SELECT * queries
+   * will return the columns in an undefined order.
    * @group dataframes
    * @since 1.3.0
    */
-  def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+  def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] = {
     sparkSession.createDataFrame(rdd, beanClass)
   }
 
   /**
    * Applies a schema to an RDD of Java Beans.
    *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, 
SELECT * queries
+   * will return the columns in an undefined order.
    * @group dataframes
    * @since 1.3.0
    */
-  def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+  def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] = {
     sparkSession.createDataFrame(rdd, beanClass)
   }
 
   /**
    * Applies a schema to a List of Java Beans.
    *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
+   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, 
SELECT * queries
+   * will return the columns in an undefined order.
    * @group dataframes
    * @since 1.6.0
    */
-  def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame 
= {
+  def createDataFrame(data: JList[_], beanClass: Class[_]): Dataset[Row] = {
     sparkSession.createDataFrame(data, beanClass)
   }
 
@@ -475,8 +426,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group genericdata
    * @since 1.4.0
    */
-  def read: DataFrameReader = sparkSession.read
-
+  def read: DataFrameReader
 
   /**
    * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
@@ -487,33 +437,29 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    *
    * @since 2.0.0
    */
-  def readStream: DataStreamReader = sparkSession.readStream
-
+  def readStream: DataStreamReader
 
   /**
-   * Creates an external table from the given path and returns the 
corresponding DataFrame.
-   * It will use the default data source configured by 
spark.sql.sources.default.
+   * Creates an external table from the given path and returns the 
corresponding DataFrame. It
+   * will use the default data source configured by spark.sql.sources.default.
    *
    * @group ddl_ops
    * @since 1.3.0
    */
   @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(tableName: String, path: String): DataFrame = {
+  def createExternalTable(tableName: String, path: String): Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, path)
   }
 
   /**
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
+   * Creates an external table from the given path based on a data source and 
returns the
+   * corresponding DataFrame.
    *
    * @group ddl_ops
    * @since 1.3.0
    */
   @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
-      tableName: String,
-      path: String,
-      source: String): DataFrame = {
+  def createExternalTable(tableName: String, path: String, source: String): 
Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, path, source)
   }
 
@@ -528,14 +474,13 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   def createExternalTable(
       tableName: String,
       source: String,
-      options: java.util.Map[String, String]): DataFrame = {
+      options: JMap[String, String]): Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, source, options)
   }
 
   /**
-   * (Scala-specific)
-   * Creates an external table from the given path based on a data source and 
a set of options.
-   * Then, returns the corresponding DataFrame.
+   * (Scala-specific) Creates an external table from the given path based on a 
data source and a
+   * set of options. Then, returns the corresponding DataFrame.
    *
    * @group ddl_ops
    * @since 1.3.0
@@ -544,13 +489,13 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   def createExternalTable(
       tableName: String,
       source: String,
-      options: Map[String, String]): DataFrame = {
+      options: Map[String, String]): Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, source, options)
   }
 
   /**
-   * Create an external table from the given path based on a data source, a 
schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * Create an external table from the given path based on a data source, a 
schema and a set of
+   * options. Then, returns the corresponding DataFrame.
    *
    * @group ddl_ops
    * @since 1.3.0
@@ -560,14 +505,13 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
       tableName: String,
       source: String,
       schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
+      options: JMap[String, String]): Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, source, schema, options)
   }
 
   /**
-   * (Scala-specific)
-   * Create an external table from the given path based on a data source, a 
schema and
-   * a set of options. Then, returns the corresponding DataFrame.
+   * (Scala-specific) Create an external table from the given path based on a 
data source, a
+   * schema and a set of options. Then, returns the corresponding DataFrame.
    *
    * @group ddl_ops
    * @since 1.3.0
@@ -577,23 +521,16 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
       tableName: String,
       source: String,
       schema: StructType,
-      options: Map[String, String]): DataFrame = {
+      options: Map[String, String]): Dataset[Row] = {
     sparkSession.catalog.createTable(tableName, source, schema, options)
   }
 
-  /**
-   * Registers the given `DataFrame` as a temporary table in the catalog. 
Temporary tables exist
-   * only during the lifetime of this instance of SQLContext.
-   */
-  private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): 
Unit = {
-    df.createOrReplaceTempView(tableName)
-  }
-
   /**
    * Drops the temporary table with the given table name in the catalog. If 
the table has been
    * cached/persisted before, it's also unpersisted.
    *
-   * @param tableName the name of the table to be unregistered.
+   * @param tableName
+   *   the name of the table to be unregistered.
    * @group basic
    * @since 1.3.0
    */
@@ -602,54 +539,53 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   }
 
   /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from 0 to `end` (exclusive) with step value 1.
+   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements in a
+   * range from 0 to `end` (exclusive) with step value 1.
    *
    * @since 1.4.1
    * @group dataframe
    */
-  def range(end: Long): DataFrame = sparkSession.range(end).toDF()
+  def range(end: Long): Dataset[Row] = sparkSession.range(end).toDF()
 
   /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with step value 1.
+   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements in a
+   * range from `start` to `end` (exclusive) with step value 1.
    *
    * @since 1.4.0
    * @group dataframe
    */
-  def range(start: Long, end: Long): DataFrame = sparkSession.range(start, 
end).toDF()
+  def range(start: Long, end: Long): Dataset[Row] = sparkSession.range(start, 
end).toDF()
 
   /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with a step value.
+   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements in a
+   * range from `start` to `end` (exclusive) with a step value.
    *
    * @since 2.0.0
    * @group dataframe
    */
-  def range(start: Long, end: Long, step: Long): DataFrame = {
+  def range(start: Long, end: Long, step: Long): Dataset[Row] = {
     sparkSession.range(start, end, step).toDF()
   }
 
   /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in an range from `start` to `end` (exclusive) with an step value, with 
partition number
-   * specified.
+   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements in an
+   * range from `start` to `end` (exclusive) with an step value, with 
partition number specified.
    *
    * @since 1.4.0
    * @group dataframe
    */
-  def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame 
= {
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): 
Dataset[Row] = {
     sparkSession.range(start, end, step, numPartitions).toDF()
   }
 
   /**
-   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`. 
This API eagerly
+   * runs DDL/DML commands, but not for SELECT queries.
    *
    * @group basic
    * @since 1.3.0
    */
-  def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText)
+  def sql(sqlText: String): Dataset[Row] = sparkSession.sql(sqlText)
 
   /**
    * Returns the specified table as a `DataFrame`.
@@ -657,41 +593,55 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @group ddl_ops
    * @since 1.3.0
    */
-  def table(tableName: String): DataFrame = {
+  def table(tableName: String): Dataset[Row] = {
     sparkSession.table(tableName)
   }
 
   /**
-   * Returns a `DataFrame` containing names of existing tables in the current 
database.
-   * The returned DataFrame has three columns, database, tableName and 
isTemporary (a Boolean
+   * Returns a `DataFrame` containing names of existing tables in the current 
database. The
+   * returned DataFrame has three columns, database, tableName and isTemporary 
(a Boolean
    * indicating if a table is a temporary one or not).
    *
    * @group ddl_ops
    * @since 1.3.0
    */
-  def tables(): DataFrame = {
-    Dataset.ofRows(sparkSession, ShowTables(CurrentNamespace, None))
+  def tables(): Dataset[Row] = {
+    mapTableDatasetOutput(sparkSession.catalog.listTables())
   }
 
   /**
-   * Returns a `DataFrame` containing names of existing tables in the given 
database.
-   * The returned DataFrame has three columns, database, tableName and 
isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
+   * Returns a `DataFrame` containing names of existing tables in the given 
database. The returned
+   * DataFrame has three columns, database, tableName and isTemporary (a 
Boolean indicating if a
+   * table is a temporary one or not).
    *
    * @group ddl_ops
    * @since 1.3.0
    */
-  def tables(databaseName: String): DataFrame = {
-    Dataset.ofRows(sparkSession, 
ShowTables(UnresolvedNamespace(Seq(databaseName)), None))
+  def tables(databaseName: String): Dataset[Row] = {
+    mapTableDatasetOutput(sparkSession.catalog.listTables(databaseName))
+  }
+
+  private def mapTableDatasetOutput(tables: Dataset[Table]): Dataset[Row] = {
+    tables
+      .select(
+        // Re-implement `org.apache.spark.sql.catalog.Table.database` method.
+        // Abusing `coalesce` to tell Spark all these columns are not nullable.
+        when(
+          coalesce(array_size(col("namespace")), lit(0)).equalTo(lit(1)),
+          coalesce(col("namespace")(0), lit("")))
+          .otherwise(lit(""))
+          .as("namespace"),
+        coalesce(col("name"), lit("")).as("tableName"),
+        col("isTemporary"))
   }
 
   /**
    * Returns a `StreamingQueryManager` that allows managing all the
-   * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active 
on `this` context.
+   * [[org.apache.spark.sql.api.StreamingQuery StreamingQueries]] active on 
`this` context.
    *
    * @since 2.0.0
    */
-  def streams: StreamingQueryManager = sparkSession.streams
+  def streams: StreamingQueryManager
 
   /**
    * Returns the names of tables in the current database as an array.
@@ -710,7 +660,11 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * @since 1.3.0
    */
   def tableNames(databaseName: String): Array[String] = {
-    sessionState.catalog.listTables(databaseName).map(_.table).toArray
+    sparkSession.catalog
+      .listTables(databaseName)
+      .select(col("name"))
+      .as(Encoders.STRING)
+      .collect()
   }
 
   ////////////////////////////////////////////////////////////////////////////
@@ -720,34 +674,38 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   ////////////////////////////////////////////////////////////////////////////
 
   /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   * @deprecated
+   *   As of 1.3.0, replaced by `createDataFrame()`.
    */
   @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+  def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row] = {
     createDataFrame(rowRDD, schema)
   }
 
   /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   * @deprecated
+   *   As of 1.3.0, replaced by `createDataFrame()`.
    */
   @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): Dataset[Row] = {
     createDataFrame(rowRDD, schema)
   }
 
   /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   * @deprecated
+   *   As of 1.3.0, replaced by `createDataFrame()`.
    */
   @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+  def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] = {
     createDataFrame(rdd, beanClass)
   }
 
   /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
+   * @deprecated
+   *   As of 1.3.0, replaced by `createDataFrame()`.
    */
   @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] = {
     createDataFrame(rdd, beanClass)
   }
 
@@ -756,82 +714,87 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * `DataFrame` if no paths are passed in.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().parquet()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().parquet()`.
    */
   @deprecated("Use read.parquet() instead.", "1.4.0")
   @scala.annotation.varargs
-  def parquetFile(paths: String*): DataFrame = {
+  def parquetFile(paths: String*): Dataset[Row] = {
     if (paths.isEmpty) {
       emptyDataFrame
     } else {
-      read.parquet(paths : _*)
+      read.parquet(paths: _*)
     }
   }
 
   /**
-   * Loads a JSON file (one object per line), returning the result as a 
`DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
+   * Loads a JSON file (one object per line), returning the result as a 
`DataFrame`. It goes
+   * through the entire dataset once to determine the schema.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String): DataFrame = {
+  def jsonFile(path: String): Dataset[Row] = {
     read.json(path)
   }
 
   /**
-   * Loads a JSON file (one object per line) and applies the given schema,
-   * returning the result as a `DataFrame`.
+   * Loads a JSON file (one object per line) and applies the given schema, 
returning the result as
+   * a `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String, schema: StructType): DataFrame = {
+  def jsonFile(path: String, schema: StructType): Dataset[Row] = {
     read.schema(schema).json(path)
   }
 
   /**
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+  def jsonFile(path: String, samplingRatio: Double): Dataset[Row] = {
     read.option("samplingRatio", samplingRatio.toString).json(path)
   }
 
   /**
    * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
-   * `DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
+   * `DataFrame`. It goes through the entire dataset once to determine the 
schema.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+  def jsonRDD(json: RDD[String]): Dataset[Row] = read.json(json)
 
   /**
    * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
-   * `DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
+   * `DataFrame`. It goes through the entire dataset once to determine the 
schema.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+  def jsonRDD(json: JavaRDD[String]): Dataset[Row] = read.json(json)
 
   /**
-   * Loads an RDD[String] storing JSON objects (one object per record) and 
applies the given schema,
-   * returning the result as a `DataFrame`.
+   * Loads an RDD[String] storing JSON objects (one object per record) and 
applies the given
+   * schema, returning the result as a `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+  def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] = {
     read.schema(schema).json(json)
   }
 
@@ -840,46 +803,50 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * schema, returning the result as a `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+  def jsonRDD(json: JavaRDD[String], schema: StructType): Dataset[Row] = {
     read.schema(schema).json(json)
   }
 
   /**
-   * Loads an RDD[String] storing JSON objects (one object per record) 
inferring the
-   * schema, returning the result as a `DataFrame`.
+   * Loads an RDD[String] storing JSON objects (one object per record) 
inferring the schema,
+   * returning the result as a `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+  def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row] = {
     read.option("samplingRatio", samplingRatio.toString).json(json)
   }
 
   /**
-   * Loads a JavaRDD[String] storing JSON objects (one object per record) 
inferring the
-   * schema, returning the result as a `DataFrame`.
+   * Loads a JavaRDD[String] storing JSON objects (one object per record) 
inferring the schema,
+   * returning the result as a `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().json()`.
    */
   @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): Dataset[Row] = {
     read.option("samplingRatio", samplingRatio.toString).json(json)
   }
 
   /**
-   * Returns the dataset stored at path as a DataFrame,
-   * using the default data source configured by spark.sql.sources.default.
+   * Returns the dataset stored at path as a DataFrame, using the default data 
source configured
+   * by spark.sql.sources.default.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by `read().load(path)`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().load(path)`.
    */
   @deprecated("Use read.load(path) instead.", "1.4.0")
-  def load(path: String): DataFrame = {
+  def load(path: String): Dataset[Row] = {
     read.load(path)
   }
 
@@ -887,90 +854,96 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * Returns the dataset stored at path as a DataFrame, using the given data 
source.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().format(source).load(path)`.
    */
   @deprecated("Use read.format(source).load(path) instead.", "1.4.0")
-  def load(path: String, source: String): DataFrame = {
+  def load(path: String, source: String): Dataset[Row] = {
     read.format(source).load(path)
   }
 
   /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame.
+   * (Java-specific) Returns the dataset specified by the given data source 
and a set of options
+   * as a DataFrame.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
    */
   @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
-  def load(source: String, options: java.util.Map[String, String]): DataFrame 
= {
+  def load(source: String, options: JMap[String, String]): Dataset[Row] = {
     read.options(options).format(source).load()
   }
 
   /**
-   * (Scala-specific) Returns the dataset specified by the given data source 
and
-   * a set of options as a DataFrame.
+   * (Scala-specific) Returns the dataset specified by the given data source 
and a set of options
+   * as a DataFrame.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
    */
   @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
-  def load(source: String, options: Map[String, String]): DataFrame = {
+  def load(source: String, options: Map[String, String]): Dataset[Row] = {
     read.options(options).format(source).load()
   }
 
   /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
+   * (Java-specific) Returns the dataset specified by the given data source 
and a set of options
+   * as a DataFrame, using the given schema as the schema of the DataFrame.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by
-   *            `read().format(source).schema(schema).options(options).load()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by 
`read().format(source).schema(schema).options(options).load()`.
    */
   @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
-  def load(
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
+  def load(source: String, schema: StructType, options: JMap[String, String]): 
Dataset[Row] = {
     read.format(source).schema(schema).options(options).load()
   }
 
   /**
-   * (Scala-specific) Returns the dataset specified by the given data source 
and
-   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
+   * (Scala-specific) Returns the dataset specified by the given data source 
and a set of options
+   * as a DataFrame, using the given schema as the schema of the DataFrame.
    *
    * @group genericdata
-   * @deprecated As of 1.4.0, replaced by
-   *            `read().format(source).schema(schema).options(options).load()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by 
`read().format(source).schema(schema).options(options).load()`.
    */
   @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
-  def load(source: String, schema: StructType, options: Map[String, String]): 
DataFrame = {
+  def load(source: String, schema: StructType, options: Map[String, String]): 
Dataset[Row] = {
     read.format(source).schema(schema).options(options).load()
   }
 
   /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table.
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
+   * table.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().jdbc()`.
    */
   @deprecated("Use read.jdbc() instead.", "1.4.0")
-  def jdbc(url: String, table: String): DataFrame = {
+  def jdbc(url: String, table: String): Dataset[Row] = {
     read.jdbc(url, table, new Properties)
   }
 
   /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table.  Partitions of the table will be retrieved in parallel 
based on the parameters
-   * passed to this function.
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
+   * table. Partitions of the table will be retrieved in parallel based on the 
parameters passed
+   * to this function.
    *
-   * @param columnName the name of a column of integral type that will be used 
for partitioning.
-   * @param lowerBound the minimum value of `columnName` used to decide 
partition stride
-   * @param upperBound the maximum value of `columnName` used to decide 
partition stride
-   * @param numPartitions the number of partitions.  the range 
`minValue`-`maxValue` will be split
-   *                      evenly into this many partitions
+   * @param columnName
+   *   the name of a column of integral type that will be used for 
partitioning.
+   * @param lowerBound
+   *   the minimum value of `columnName` used to decide partition stride
+   * @param upperBound
+   *   the maximum value of `columnName` used to decide partition stride
+   * @param numPartitions
+   *   the number of partitions. the range `minValue`-`maxValue` will be split 
evenly into this
+   *   many partitions
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().jdbc()`.
    */
   @deprecated("Use read.jdbc() instead.", "1.4.0")
   def jdbc(
@@ -979,34 +952,36 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
-      numPartitions: Int): DataFrame = {
+      numPartitions: Int): Dataset[Row] = {
     read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, 
new Properties)
   }
 
   /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table. The theParts parameter gives a list expressions
-   * suitable for inclusion in WHERE clauses; each one defines one partition
-   * of the `DataFrame`.
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL url named
+   * table. The theParts parameter gives a list expressions suitable for 
inclusion in WHERE
+   * clauses; each one defines one partition of the `DataFrame`.
    *
    * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
+   * @deprecated
+   *   As of 1.4.0, replaced by `read().jdbc()`.
    */
   @deprecated("Use read.jdbc() instead.", "1.4.0")
-  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+  def jdbc(url: String, table: String, theParts: Array[String]): Dataset[Row] 
= {
     read.jdbc(url, table, theParts, new Properties)
   }
 }
 
 /**
- * This SQLContext object contains utility functions to create a singleton 
SQLContext instance,
- * or to get the created SQLContext instance.
+ * This SQLContext object contains utility functions to create a singleton 
SQLContext instance, or
+ * to get the created SQLContext instance.
  *
  * It also provides utility functions to support preference for threads in 
multiple sessions
  * scenario, setActive could set a SQLContext for current thread, which will 
be returned by
  * getOrCreate instead of the global one.
  */
-object SQLContext {
+trait SQLContextCompanion {
+  private[sql] type SQLContextImpl <: SQLContext
+  private[sql] type SparkContextImpl <: SparkContext
 
   /**
    * Get the singleton SQLContext if it exists or create a new one using the 
given SparkContext.
@@ -1014,31 +989,29 @@ object SQLContext {
    * This function can be used to create a singleton SQLContext object that 
can be shared across
    * the JVM.
    *
-   * If there is an active SQLContext for current thread, it will be returned 
instead of the global
-   * one.
+   * If there is an active SQLContext for current thread, it will be returned 
instead of the
+   * global one.
    *
    * @since 1.5.0
    */
   @deprecated("Use SparkSession.builder instead", "2.0.0")
-  def getOrCreate(sparkContext: SparkContext): SQLContext = {
-    SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
-  }
+  def getOrCreate(sparkContext: SparkContextImpl): SQLContextImpl
 
   /**
    * Changes the SQLContext that will be returned in this thread and its 
children when
-   * SQLContext.getOrCreate() is called. This can be used to ensure that a 
given thread receives
-   * a SQLContext with an isolated session, instead of the global (first 
created) context.
+   * SQLContext.getOrCreate() is called. This can be used to ensure that a 
given thread receives a
+   * SQLContext with an isolated session, instead of the global (first 
created) context.
    *
    * @since 1.6.0
    */
   @deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
-  def setActive(sqlContext: SQLContext): Unit = {
+  def setActive(sqlContext: SQLContextImpl): Unit = {
     SparkSession.setActiveSession(sqlContext.sparkSession)
   }
 
   /**
-   * Clears the active SQLContext for current thread. Subsequent calls to 
getOrCreate will
-   * return the first created context instead of a thread-local override.
+   * Clears the active SQLContext for current thread. Subsequent calls to 
getOrCreate will return
+   * the first created context instead of a thread-local override.
    *
    * @since 1.6.0
    */
@@ -1046,52 +1019,4 @@ object SQLContext {
   def clearActive(): Unit = {
     SparkSession.clearActiveSession()
   }
-
-  /**
-   * Converts an iterator of Java Beans to InternalRow using the provided
-   * bean info & schema. This is not related to the singleton, but is a static
-   * method for internal use.
-   */
-  private[sql] def beansToRows(
-      data: Iterator[_],
-      beanClass: Class[_],
-      attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
-    def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any 
=> InternalRow = {
-      val methodConverters =
-        JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
-          .map { case (property, fieldType) =>
-            val method = property.getReadMethod
-            method -> createConverter(method.getReturnType, fieldType)
-          }
-      value =>
-        if (value == null) {
-          null
-        } else {
-          new GenericInternalRow(
-            methodConverters.map { case (method, converter) =>
-              converter(method.invoke(value))
-            })
-        }
-    }
-    def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
-      case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
-      case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
-    }
-    val dataConverter = createStructConverter(beanClass, attrs.map(_.dataType))
-    data.map(dataConverter)
-  }
-
-  /**
-   * Extract `spark.sql.*` properties from the conf and return them as a 
[[Properties]].
-   */
-  private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = {
-    val properties = new Properties
-    sparkConf.getAll.foreach { case (key, value) =>
-      if (key.startsWith("spark.sql")) {
-        properties.setProperty(key, value)
-      }
-    }
-    properties
-  }
-
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
index 35f74497b96f..af2144cb9eb4 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{SparkConf, SparkContext, 
SparkException}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, ExperimentalMethods, Row, RuntimeConfig, 
SparkSessionExtensions, SQLContext}
+import org.apache.spark.sql.{Encoder, ExperimentalMethods, Row, RuntimeConfig, 
SparkSessionExtensions}
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
diff --git a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala 
b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
index ad8771a03b28..9c5fb515580a 100644
--- a/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
+++ b/sql/connect/shims/src/main/scala/org/apache/spark/shims.scala
@@ -32,7 +32,6 @@ package rdd {
 package sql {
   class ExperimentalMethods
   class SparkSessionExtensions
-  class SQLContext
 
   package execution {
     class QueryExecution
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 636899a7acb0..1318563f8c93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,21 +17,18 @@
 
 package org.apache.spark.sql
 
-import java.util.Properties
+import java.util.{List => JList, Map => JMap, Properties}
 
-import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, 
UnresolvedNamespace}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ShowTables
+import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryManager}
@@ -41,8 +38,8 @@ import org.apache.spark.sql.util.ExecutionListenerManager
 /**
  * The entry point for working with structured data (rows and columns) in 
Spark 1.x.
  *
- * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are 
keeping the class
- * here for backward compatibility.
+ * As of Spark 2.0, this is replaced by [[SparkSession]]. However, we are 
keeping the class here
+ * for backward compatibility.
  *
  * @groupname basic Basic Operations
  * @groupname ddl_ops Persistent Catalog DDL
@@ -56,8 +53,8 @@ import org.apache.spark.sql.util.ExecutionListenerManager
  * @since 1.0.0
  */
 @Stable
-class SQLContext private[sql](val sparkSession: SparkSession)
-  extends Logging with Serializable {
+class SQLContext private[sql] (override val sparkSession: SparkSession)
+    extends api.SQLContext(sparkSession) {
 
   self =>
 
@@ -77,980 +74,325 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
   // TODO: move this logic into SparkSession
 
   private[sql] def sessionState: SessionState = sparkSession.sessionState
+
   private[sql] def sharedState: SharedState = sparkSession.sharedState
+
   @deprecated("Use SparkSession.sessionState.conf instead", "4.0.0")
   private[sql] def conf: SQLConf = sessionState.conf
 
-  def sparkContext: SparkContext = sparkSession.sparkContext
-
-  /**
-   * Returns a [[SQLContext]] as new session, with separated SQL 
configurations, temporary
-   * tables, registered functions, but sharing the same `SparkContext`, cached 
data and
-   * other things.
-   *
-   * @since 1.6.0
-   */
-  def newSession(): SQLContext = sparkSession.newSession().sqlContext
-
-  /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
-   */
+  /** @inheritdoc */
   def listenerManager: ExecutionListenerManager = sparkSession.listenerManager
 
-  /**
-   * Set Spark SQL configuration properties.
-   *
-   * @group config
-   * @since 1.0.0
-   */
+  /** @inheritdoc */
   def setConf(props: Properties): Unit = {
     sessionState.conf.setConf(props)
   }
 
-  /**
-   * Set the given Spark SQL configuration property.
-   */
   private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
     sessionState.conf.setConf(entry, value)
   }
 
-  /**
-   * Set the given Spark SQL configuration property.
-   *
-   * @group config
-   * @since 1.0.0
-   */
-  def setConf(key: String, value: String): Unit = {
-    sparkSession.conf.set(key, value)
-  }
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key.
-   *
-   * @group config
-   * @since 1.0.0
-   */
-  def getConf(key: String): String = {
-    sparkSession.conf.get(key)
-  }
-
-  /**
-   * Return the value of Spark SQL configuration property for the given key. 
If the key is not set
-   * yet, return `defaultValue`.
-   *
-   * @group config
-   * @since 1.0.0
-   */
-  def getConf(key: String, defaultValue: String): String = {
-    sparkSession.conf.get(key, defaultValue)
-  }
-
-  /**
-   * Return all the configuration properties that have been set (i.e. not the 
default).
-   * This creates a new copy of the config properties in the form of a Map.
-   *
-   * @group config
-   * @since 1.0.0
-   */
-  def getAllConfs: immutable.Map[String, String] = {
-    sparkSession.conf.getAll
-  }
-
-  /**
-   * :: Experimental ::
-   * A collection of methods that are considered experimental, but can be used 
to hook into
-   * the query planner for advanced functionality.
-   *
-   * @group basic
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   @Experimental
   @transient
   @Unstable
   def experimental: ExperimentalMethods = sparkSession.experimental
 
-  /**
-   * Returns a `DataFrame` with no rows or columns.
-   *
-   * @group basic
-   * @since 1.3.0
-   */
-  def emptyDataFrame: DataFrame = sparkSession.emptyDataFrame
-
-  /**
-   * A collection of methods for registering user-defined functions (UDF).
-   *
-   * The following example registers a Scala closure as UDF:
-   * {{{
-   *   sqlContext.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + 
arg1)
-   * }}}
-   *
-   * The following example registers a UDF in Java:
-   * {{{
-   *   sqlContext.udf().register("myUDF",
-   *       (Integer arg1, String arg2) -> arg2 + arg1,
-   *       DataTypes.StringType);
-   * }}}
-   *
-   * @note The user-defined functions must be deterministic. Due to 
optimization,
-   * duplicate invocations may be eliminated or the function may even be 
invoked more times than
-   * it is present in the query.
-   *
-   * @group basic
-   * @since 1.3.0
-   */
+  /** @inheritdoc */
   def udf: UDFRegistration = sparkSession.udf
 
-  /**
-   * Returns true if the table is currently cached in-memory.
-   * @group cachemgmt
-   * @since 1.3.0
-   */
-  def isCached(tableName: String): Boolean = {
-    sparkSession.catalog.isCached(tableName)
-  }
-
-  /**
-   * Caches the specified table in-memory.
-   * @group cachemgmt
-   * @since 1.3.0
-   */
-  def cacheTable(tableName: String): Unit = {
-    sparkSession.catalog.cacheTable(tableName)
-  }
-
-  /**
-   * Removes the specified table from the in-memory cache.
-   * @group cachemgmt
-   * @since 1.3.0
-   */
-  def uncacheTable(tableName: String): Unit = {
-    sparkSession.catalog.uncacheTable(tableName)
-  }
-
-  /**
-   * Removes all cached tables from the in-memory cache.
-   * @since 1.3.0
-   */
-  def clearCache(): Unit = {
-    sparkSession.catalog.clearCache()
-  }
-
   // scalastyle:off
   // Disable style checker so "implicits" object can start with lowercase i
-  /**
-   * (Scala-specific) Implicit methods available in Scala for converting
-   * common Scala objects into `DataFrame`s.
-   *
-   * {{{
-   *   val sqlContext = new SQLContext(sc)
-   *   import sqlContext.implicits._
-   * }}}
-   *
-   * @group basic
-   * @since 1.3.0
-   */
+
+  /** @inheritdoc */
   object implicits extends SQLImplicits {
+    /** @inheritdoc */
     override protected def session: SparkSession = sparkSession
   }
+
   // scalastyle:on
 
   /**
-   * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
-   *
-   * @group dataframes
-   * @since 1.3.0
+   * Creates a DataFrame from an RDD[Row]. User can specify whether the input 
rows should be
+   * converted to Catalyst rows.
    */
-  def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
-    sparkSession.createDataFrame(rdd)
+  private[sql] def internalCreateDataFrame(
+      catalystRows: RDD[InternalRow],
+      schema: StructType,
+      isStreaming: Boolean = false): DataFrame = {
+    sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
   }
 
-  /**
-   * Creates a DataFrame from a local Seq of Product.
-   *
-   * @group dataframes
-   * @since 1.3.0
-   */
-  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
-    sparkSession.createDataFrame(data)
-  }
+  /** @inheritdoc */
+  def read: DataFrameReader = sparkSession.read
 
-  /**
-   * Convert a `BaseRelation` created for external data sources into a 
`DataFrame`.
-   *
-   * @group dataframes
-   * @since 1.3.0
-   */
-  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
-    sparkSession.baseRelationToDataFrame(baseRelation)
-  }
+  /** @inheritdoc */
+  def readStream: DataStreamReader = sparkSession.readStream
 
   /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from an `RDD` containing [[Row]]s using the given 
schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided RDD matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   * Example:
-   * {{{
-   *  import org.apache.spark.sql._
-   *  import org.apache.spark.sql.types._
-   *  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-   *
-   *  val schema =
-   *    StructType(
-   *      StructField("name", StringType, false) ::
-   *      StructField("age", IntegerType, true) :: Nil)
-   *
-   *  val people =
-   *    sc.textFile("examples/src/main/resources/people.txt").map(
-   *      _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
-   *  val dataFrame = sqlContext.createDataFrame(people, schema)
-   *  dataFrame.printSchema
-   *  // root
-   *  // |-- name: string (nullable = false)
-   *  // |-- age: integer (nullable = true)
-   *
-   *  dataFrame.createOrReplaceTempView("people")
-   *  sqlContext.sql("select name from people").collect.foreach(println)
-   * }}}
-   *
-   * @group dataframes
-   * @since 1.3.0
+   * Registers the given `DataFrame` as a temporary table in the catalog. 
Temporary tables exist
+   * only during the lifetime of this instance of SQLContext.
    */
-  @DeveloperApi
-  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
-    sparkSession.createDataFrame(rowRDD, schema)
+  private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): 
Unit = {
+    df.createOrReplaceTempView(tableName)
   }
 
   /**
-   * Creates a [[Dataset]] from a local Seq of data of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
-   *
-   * == Example ==
-   *
-   * {{{
-   *
-   *   import spark.implicits._
-   *   case class Person(name: String, age: Long)
-   *   val data = Seq(Person("Michael", 29), Person("Andy", 30), 
Person("Justin", 19))
-   *   val ds = spark.createDataset(data)
-   *
-   *   ds.show()
-   *   // +-------+---+
-   *   // |   name|age|
-   *   // +-------+---+
-   *   // |Michael| 29|
-   *   // |   Andy| 30|
-   *   // | Justin| 19|
-   *   // +-------+---+
-   * }}}
+   * Returns a `StreamingQueryManager` that allows managing all the
+   * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active 
on `this` context.
    *
    * @since 2.0.0
-   * @group dataset
    */
-  def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
-    sparkSession.createDataset(data)
-  }
+  def streams: StreamingQueryManager = sparkSession.streams
 
-  /**
-   * Creates a [[Dataset]] from an RDD of a given type. This method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
-   *
-   * @since 2.0.0
-   * @group dataset
-   */
-  def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = {
-    sparkSession.createDataset(data)
-  }
+  /** @inheritdoc */
+  override def sparkContext: SparkContext = super.sparkContext
 
-  /**
-   * Creates a [[Dataset]] from a `java.util.List` of a given type. This 
method requires an
-   * encoder (to convert a JVM object of type `T` to and from the internal 
Spark SQL representation)
-   * that is generally created automatically through implicits from a 
`SparkSession`, or can be
-   * created explicitly by calling static methods on [[Encoders]].
-   *
-   * == Java Example ==
-   *
-   * {{{
-   *     List<String> data = Arrays.asList("hello", "world");
-   *     Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
-   * }}}
-   *
-   * @since 2.0.0
-   * @group dataset
-   */
-  def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = {
-    sparkSession.createDataset(data)
-  }
+  /** @inheritdoc */
+  override def newSession(): SQLContext = sparkSession.newSession().sqlContext
 
-  /**
-   * Creates a DataFrame from an RDD[Row]. User can specify whether the input 
rows should be
-   * converted to Catalyst rows.
-   */
-  private[sql]
-  def internalCreateDataFrame(
-      catalystRows: RDD[InternalRow],
-      schema: StructType,
-      isStreaming: Boolean = false) = {
-    sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming)
-  }
+  /** @inheritdoc */
+  override def emptyDataFrame: Dataset[Row] = super.emptyDataFrame
 
-  /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from a `JavaRDD` containing [[Row]]s using the 
given schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided RDD matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   *
-   * @group dataframes
-   * @since 1.3.0
-   */
-  @DeveloperApi
-  def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
-    sparkSession.createDataFrame(rowRDD, schema)
-  }
+  /** @inheritdoc */
+  override def createDataFrame[A <: Product: TypeTag](rdd: RDD[A]): 
Dataset[Row] =
+    super.createDataFrame(rdd)
 
-  /**
-   * :: DeveloperApi ::
-   * Creates a `DataFrame` from a `java.util.List` containing [[Row]]s using 
the given schema.
-   * It is important to make sure that the structure of every [[Row]] of the 
provided List matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   *
-   * @group dataframes
-   * @since 1.6.0
-   */
+  /** @inheritdoc */
+  override def createDataFrame[A <: Product: TypeTag](data: Seq[A]): 
Dataset[Row] =
+    super.createDataFrame(data)
+
+  /** @inheritdoc */
+  override def baseRelationToDataFrame(baseRelation: BaseRelation): 
Dataset[Row] =
+    super.baseRelationToDataFrame(baseRelation)
+
+  /** @inheritdoc */
   @DeveloperApi
-  def createDataFrame(rows: java.util.List[Row], schema: StructType): 
DataFrame = {
-    sparkSession.createDataFrame(rows, schema)
-  }
+  override def createDataFrame(rowRDD: RDD[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rowRDD, schema)
 
-  /**
-   * Applies a schema to an RDD of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   * @group dataframes
-   * @since 1.3.0
-   */
-  def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
-    sparkSession.createDataFrame(rdd, beanClass)
-  }
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: Seq[T]): Dataset[T] = 
super.createDataset(data)
 
-  /**
-   * Applies a schema to an RDD of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   * @group dataframes
-   * @since 1.3.0
-   */
-  def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
-    sparkSession.createDataFrame(rdd, beanClass)
-  }
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: RDD[T]): Dataset[T] = 
super.createDataset(data)
 
-  /**
-   * Applies a schema to a List of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   * @group dataframes
-   * @since 1.6.0
-   */
-  def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame 
= {
-    sparkSession.createDataFrame(data, beanClass)
-  }
+  /** @inheritdoc */
+  override def createDataset[T: Encoder](data: JList[T]): Dataset[T] =
+    super.createDataset(data)
 
-  /**
-   * Returns a [[DataFrameReader]] that can be used to read non-streaming data 
in as a
-   * `DataFrame`.
-   * {{{
-   *   sqlContext.read.parquet("/path/to/file.parquet")
-   *   sqlContext.read.schema(schema).json("/path/to/file.json")
-   * }}}
-   *
-   * @group genericdata
-   * @since 1.4.0
-   */
-  def read: DataFrameReader = sparkSession.read
+  /** @inheritdoc */
+  @DeveloperApi
+  override def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rowRDD, schema)
 
+  /** @inheritdoc */
+  @DeveloperApi
+  override def createDataFrame(rows: JList[Row], schema: StructType): 
Dataset[Row] =
+    super.createDataFrame(rows, schema)
 
-  /**
-   * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
-   * {{{
-   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
-   *   
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
-   * }}}
-   *
-   * @since 2.0.0
-   */
-  def readStream: DataStreamReader = sparkSession.readStream
+  /** @inheritdoc */
+  override def createDataFrame(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] 
=
+    super.createDataFrame(rdd, beanClass)
 
+  /** @inheritdoc */
+  override def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): 
Dataset[Row] =
+    super.createDataFrame(rdd, beanClass)
 
-  /**
-   * Creates an external table from the given path and returns the 
corresponding DataFrame.
-   * It will use the default data source configured by 
spark.sql.sources.default.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(tableName: String, path: String): DataFrame = {
-    sparkSession.catalog.createTable(tableName, path)
-  }
+  /** @inheritdoc */
+  override def createDataFrame(data: JList[_], beanClass: Class[_]): 
Dataset[Row] =
+    super.createDataFrame(data, beanClass)
 
-  /**
-   * Creates an external table from the given path based on a data source
-   * and returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
+  /** @inheritdoc */
+  override def createExternalTable(tableName: String, path: String): 
Dataset[Row] =
+    super.createExternalTable(tableName, path)
+
+  /** @inheritdoc */
+  override def createExternalTable(
       tableName: String,
       path: String,
-      source: String): DataFrame = {
-    sparkSession.catalog.createTable(tableName, path, source)
+      source: String): Dataset[Row] = {
+    super.createExternalTable(tableName, path, source)
   }
 
-  /**
-   * Creates an external table from the given path based on a data source and 
a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
+  /** @inheritdoc */
+  override def createExternalTable(
       tableName: String,
       source: String,
-      options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.catalog.createTable(tableName, source, options)
+      options: JMap[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, options)
   }
 
-  /**
-   * (Scala-specific)
-   * Creates an external table from the given path based on a data source and 
a set of options.
-   * Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
+  /** @inheritdoc */
+  override def createExternalTable(
       tableName: String,
       source: String,
-      options: Map[String, String]): DataFrame = {
-    sparkSession.catalog.createTable(tableName, source, options)
+      options: Map[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, options)
   }
 
-  /**
-   * Create an external table from the given path based on a data source, a 
schema and
-   * a set of options. Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
+  /** @inheritdoc */
+  override def createExternalTable(
       tableName: String,
       source: String,
       schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    sparkSession.catalog.createTable(tableName, source, schema, options)
+      options: JMap[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, schema, options)
   }
 
-  /**
-   * (Scala-specific)
-   * Create an external table from the given path based on a data source, a 
schema and
-   * a set of options. Then, returns the corresponding DataFrame.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  @deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
-  def createExternalTable(
+  /** @inheritdoc */
+  override def createExternalTable(
       tableName: String,
       source: String,
       schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    sparkSession.catalog.createTable(tableName, source, schema, options)
-  }
-
-  /**
-   * Registers the given `DataFrame` as a temporary table in the catalog. 
Temporary tables exist
-   * only during the lifetime of this instance of SQLContext.
-   */
-  private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): 
Unit = {
-    df.createOrReplaceTempView(tableName)
+      options: Map[String, String]): Dataset[Row] = {
+    super.createExternalTable(tableName, source, schema, options)
   }
 
-  /**
-   * Drops the temporary table with the given table name in the catalog. If 
the table has been
-   * cached/persisted before, it's also unpersisted.
-   *
-   * @param tableName the name of the table to be unregistered.
-   * @group basic
-   * @since 1.3.0
-   */
-  def dropTempTable(tableName: String): Unit = {
-    sparkSession.catalog.dropTempView(tableName)
-  }
+  /** @inheritdoc */
+  override def range(end: Long): Dataset[Row] = super.range(end)
 
-  /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from 0 to `end` (exclusive) with step value 1.
-   *
-   * @since 1.4.1
-   * @group dataframe
-   */
-  def range(end: Long): DataFrame = sparkSession.range(end).toDF()
-
-  /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with step value 1.
-   *
-   * @since 1.4.0
-   * @group dataframe
-   */
-  def range(start: Long, end: Long): DataFrame = sparkSession.range(start, 
end).toDF()
-
-  /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in a range from `start` to `end` (exclusive) with a step value.
-   *
-   * @since 2.0.0
-   * @group dataframe
-   */
-  def range(start: Long, end: Long, step: Long): DataFrame = {
-    sparkSession.range(start, end, step).toDF()
-  }
-
-  /**
-   * Creates a `DataFrame` with a single `LongType` column named `id`, 
containing elements
-   * in an range from `start` to `end` (exclusive) with an step value, with 
partition number
-   * specified.
-   *
-   * @since 1.4.0
-   * @group dataframe
-   */
-  def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame 
= {
-    sparkSession.range(start, end, step, numPartitions).toDF()
-  }
-
-  /**
-   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
-   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
-   *
-   * @group basic
-   * @since 1.3.0
-   */
-  def sql(sqlText: String): DataFrame = sparkSession.sql(sqlText)
-
-  /**
-   * Returns the specified table as a `DataFrame`.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  def table(tableName: String): DataFrame = {
-    sparkSession.table(tableName)
-  }
+  /** @inheritdoc */
+  override def range(start: Long, end: Long): Dataset[Row] = 
super.range(start, end)
 
-  /**
-   * Returns a `DataFrame` containing names of existing tables in the current 
database.
-   * The returned DataFrame has three columns, database, tableName and 
isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  def tables(): DataFrame = {
-    Dataset.ofRows(sparkSession, ShowTables(CurrentNamespace, None))
-  }
+  /** @inheritdoc */
+  override def range(start: Long, end: Long, step: Long): Dataset[Row] =
+    super.range(start, end, step)
 
-  /**
-   * Returns a `DataFrame` containing names of existing tables in the given 
database.
-   * The returned DataFrame has three columns, database, tableName and 
isTemporary (a Boolean
-   * indicating if a table is a temporary one or not).
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  def tables(databaseName: String): DataFrame = {
-    Dataset.ofRows(sparkSession, 
ShowTables(UnresolvedNamespace(Seq(databaseName)), None))
-  }
+  /** @inheritdoc */
+  override def range(start: Long, end: Long, step: Long, numPartitions: Int): 
Dataset[Row] =
+    super.range(start, end, step, numPartitions)
 
-  /**
-   * Returns a `StreamingQueryManager` that allows managing all the
-   * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active 
on `this` context.
-   *
-   * @since 2.0.0
-   */
-  def streams: StreamingQueryManager = sparkSession.streams
+  /** @inheritdoc */
+  override def sql(sqlText: String): Dataset[Row] = super.sql(sqlText)
 
-  /**
-   * Returns the names of tables in the current database as an array.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  def tableNames(): Array[String] = {
-    tableNames(sparkSession.catalog.currentDatabase)
-  }
+  /** @inheritdoc */
+  override def table(tableName: String): Dataset[Row] = super.table(tableName)
 
-  /**
-   * Returns the names of tables in the given database as an array.
-   *
-   * @group ddl_ops
-   * @since 1.3.0
-   */
-  def tableNames(databaseName: String): Array[String] = {
-    sessionState.catalog.listTables(databaseName).map(_.table).toArray
-  }
+  /** @inheritdoc */
+  override def tables(): DataFrame = super.tables()
 
-  ////////////////////////////////////////////////////////////////////////////
-  ////////////////////////////////////////////////////////////////////////////
-  // Deprecated methods
-  ////////////////////////////////////////////////////////////////////////////
-  ////////////////////////////////////////////////////////////////////////////
+  /** @inheritdoc */
+  override def tables(databaseName: String): DataFrame = 
super.tables(databaseName)
 
-  /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
-   */
-  @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
-    createDataFrame(rowRDD, schema)
-  }
+  /** @inheritdoc */
+  override def applySchema(rowRDD: RDD[Row], schema: StructType): Dataset[Row] 
=
+    super.applySchema(rowRDD, schema)
 
-  /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
-   */
-  @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
-    createDataFrame(rowRDD, schema)
-  }
+  /** @inheritdoc */
+  override def applySchema(rowRDD: JavaRDD[Row], schema: StructType): 
Dataset[Row] =
+    super.applySchema(rowRDD, schema)
 
-  /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
-   */
-  @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
-    createDataFrame(rdd, beanClass)
-  }
+  /** @inheritdoc */
+  override def applySchema(rdd: RDD[_], beanClass: Class[_]): Dataset[Row] =
+    super.applySchema(rdd, beanClass)
 
-  /**
-   * @deprecated As of 1.3.0, replaced by `createDataFrame()`.
-   */
-  @deprecated("Use createDataFrame instead.", "1.3.0")
-  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
-    createDataFrame(rdd, beanClass)
-  }
+  /** @inheritdoc */
+  override def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): Dataset[Row] 
=
+    super.applySchema(rdd, beanClass)
 
-  /**
-   * Loads a Parquet file, returning the result as a `DataFrame`. This 
function returns an empty
-   * `DataFrame` if no paths are passed in.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().parquet()`.
-   */
-  @deprecated("Use read.parquet() instead.", "1.4.0")
+  /** @inheritdoc */
   @scala.annotation.varargs
-  def parquetFile(paths: String*): DataFrame = {
-    if (paths.isEmpty) {
-      emptyDataFrame
-    } else {
-      read.parquet(paths : _*)
-    }
-  }
+  override def parquetFile(paths: String*): Dataset[Row] = 
super.parquetFile(paths: _*)
 
-  /**
-   * Loads a JSON file (one object per line), returning the result as a 
`DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String): DataFrame = {
-    read.json(path)
-  }
+  /** @inheritdoc */
+  override def jsonFile(path: String): Dataset[Row] = super.jsonFile(path)
 
-  /**
-   * Loads a JSON file (one object per line) and applies the given schema,
-   * returning the result as a `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String, schema: StructType): DataFrame = {
-    read.schema(schema).json(path)
-  }
+  /** @inheritdoc */
+  override def jsonFile(path: String, schema: StructType): Dataset[Row] =
+    super.jsonFile(path, schema)
 
-  /**
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(path)
-  }
+  /** @inheritdoc */
+  override def jsonFile(path: String, samplingRatio: Double): Dataset[Row] =
+    super.jsonFile(path, samplingRatio)
 
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
-   * `DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String]): Dataset[Row] = read.json(json)
 
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record), 
returning the result as a
-   * `DataFrame`.
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String]): Dataset[Row] = read.json(json)
 
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record) and 
applies the given schema,
-   * returning the result as a `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
-    read.schema(schema).json(json)
-  }
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String], schema: StructType): Dataset[Row] =
+    super.jsonRDD(json, schema)
 
-  /**
-   * Loads an JavaRDD[String] storing JSON objects (one object per record) and 
applies the given
-   * schema, returning the result as a `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
-    read.schema(schema).json(json)
-  }
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String], schema: StructType): 
Dataset[Row] =
+    super.jsonRDD(json, schema)
 
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record) 
inferring the
-   * schema, returning the result as a `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(json)
-  }
+  /** @inheritdoc */
+  override def jsonRDD(json: RDD[String], samplingRatio: Double): Dataset[Row] 
=
+    super.jsonRDD(json, samplingRatio)
 
-  /**
-   * Loads a JavaRDD[String] storing JSON objects (one object per record) 
inferring the
-   * schema, returning the result as a `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().json()`.
-   */
-  @deprecated("Use read.json() instead.", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(json)
-  }
+  /** @inheritdoc */
+  override def jsonRDD(json: JavaRDD[String], samplingRatio: Double): 
Dataset[Row] =
+    super.jsonRDD(json, samplingRatio)
 
-  /**
-   * Returns the dataset stored at path as a DataFrame,
-   * using the default data source configured by spark.sql.sources.default.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by `read().load(path)`.
-   */
-  @deprecated("Use read.load(path) instead.", "1.4.0")
-  def load(path: String): DataFrame = {
-    read.load(path)
-  }
+  /** @inheritdoc */
+  override def load(path: String): Dataset[Row] = super.load(path)
 
-  /**
-   * Returns the dataset stored at path as a DataFrame, using the given data 
source.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
-   */
-  @deprecated("Use read.format(source).load(path) instead.", "1.4.0")
-  def load(path: String, source: String): DataFrame = {
-    read.format(source).load(path)
-  }
+  /** @inheritdoc */
+  override def load(path: String, source: String): Dataset[Row] = 
super.load(path, source)
 
-  /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
-   */
-  @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
-  def load(source: String, options: java.util.Map[String, String]): DataFrame 
= {
-    read.options(options).format(source).load()
-  }
+  /** @inheritdoc */
+  override def load(source: String, options: JMap[String, String]): 
Dataset[Row] =
+    super.load(source, options)
 
-  /**
-   * (Scala-specific) Returns the dataset specified by the given data source 
and
-   * a set of options as a DataFrame.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by 
`read().format(source).options(options).load()`.
-   */
-  @deprecated("Use read.format(source).options(options).load() instead.", 
"1.4.0")
-  def load(source: String, options: Map[String, String]): DataFrame = {
-    read.options(options).format(source).load()
-  }
+  /** @inheritdoc */
+  override def load(source: String, options: Map[String, String]): 
Dataset[Row] =
+    super.load(source, options)
 
-  /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by
-   *            `read().format(source).schema(schema).options(options).load()`.
-   */
-  @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
-  def load(
+  /** @inheritdoc */
+  override def load(
       source: String,
       schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    read.format(source).schema(schema).options(options).load()
+      options: JMap[String, String]): Dataset[Row] = {
+    super.load(source, schema, options)
   }
 
-  /**
-   * (Scala-specific) Returns the dataset specified by the given data source 
and
-   * a set of options as a DataFrame, using the given schema as the schema of 
the DataFrame.
-   *
-   * @group genericdata
-   * @deprecated As of 1.4.0, replaced by
-   *            `read().format(source).schema(schema).options(options).load()`.
-   */
-  @deprecated("Use read.format(source).schema(schema).options(options).load() 
instead.", "1.4.0")
-  def load(source: String, schema: StructType, options: Map[String, String]): 
DataFrame = {
-    read.format(source).schema(schema).options(options).load()
+  /** @inheritdoc */
+  override def load(
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): Dataset[Row] = {
+    super.load(source, schema, options)
   }
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
-   */
-  @deprecated("Use read.jdbc() instead.", "1.4.0")
-  def jdbc(url: String, table: String): DataFrame = {
-    read.jdbc(url, table, new Properties)
-  }
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String): Dataset[Row] = 
super.jdbc(url, table)
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table.  Partitions of the table will be retrieved in parallel 
based on the parameters
-   * passed to this function.
-   *
-   * @param columnName the name of a column of integral type that will be used 
for partitioning.
-   * @param lowerBound the minimum value of `columnName` used to decide 
partition stride
-   * @param upperBound the maximum value of `columnName` used to decide 
partition stride
-   * @param numPartitions the number of partitions.  the range 
`minValue`-`maxValue` will be split
-   *                      evenly into this many partitions
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
-   */
-  @deprecated("Use read.jdbc() instead.", "1.4.0")
-  def jdbc(
+  /** @inheritdoc */
+  override def jdbc(
       url: String,
       table: String,
       columnName: String,
       lowerBound: Long,
       upperBound: Long,
-      numPartitions: Int): DataFrame = {
-    read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, 
new Properties)
+      numPartitions: Int): Dataset[Row] = {
+    super.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions)
   }
 
-  /**
-   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
-   * url named table. The theParts parameter gives a list expressions
-   * suitable for inclusion in WHERE clauses; each one defines one partition
-   * of the `DataFrame`.
-   *
-   * @group specificdata
-   * @deprecated As of 1.4.0, replaced by `read().jdbc()`.
-   */
-  @deprecated("Use read.jdbc() instead.", "1.4.0")
-  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
-    read.jdbc(url, table, theParts, new Properties)
-  }
+  /** @inheritdoc */
+  override def jdbc(url: String, table: String, theParts: Array[String]): 
Dataset[Row] =
+    super.jdbc(url, table, theParts)
 }
 
-/**
- * This SQLContext object contains utility functions to create a singleton 
SQLContext instance,
- * or to get the created SQLContext instance.
- *
- * It also provides utility functions to support preference for threads in 
multiple sessions
- * scenario, setActive could set a SQLContext for current thread, which will 
be returned by
- * getOrCreate instead of the global one.
- */
-object SQLContext {
+object SQLContext extends api.SQLContextCompanion {
 
-  /**
-   * Get the singleton SQLContext if it exists or create a new one using the 
given SparkContext.
-   *
-   * This function can be used to create a singleton SQLContext object that 
can be shared across
-   * the JVM.
-   *
-   * If there is an active SQLContext for current thread, it will be returned 
instead of the global
-   * one.
-   *
-   * @since 1.5.0
-   */
-  @deprecated("Use SparkSession.builder instead", "2.0.0")
+  override private[sql] type SQLContextImpl = SQLContext
+  override private[sql] type SparkContextImpl = SparkContext
+
+  /** @inheritdoc */
   def getOrCreate(sparkContext: SparkContext): SQLContext = {
     SparkSession.builder().sparkContext(sparkContext).getOrCreate().sqlContext
   }
 
-  /**
-   * Changes the SQLContext that will be returned in this thread and its 
children when
-   * SQLContext.getOrCreate() is called. This can be used to ensure that a 
given thread receives
-   * a SQLContext with an isolated session, instead of the global (first 
created) context.
-   *
-   * @since 1.6.0
-   */
-  @deprecated("Use SparkSession.setActiveSession instead", "2.0.0")
-  def setActive(sqlContext: SQLContext): Unit = {
-    SparkSession.setActiveSession(sqlContext.sparkSession)
-  }
-
-  /**
-   * Clears the active SQLContext for current thread. Subsequent calls to 
getOrCreate will
-   * return the first created context instead of a thread-local override.
-   *
-   * @since 1.6.0
-   */
-  @deprecated("Use SparkSession.clearActiveSession instead", "2.0.0")
-  def clearActive(): Unit = {
-    SparkSession.clearActiveSession()
-  }
+  /** @inheritdoc */
+  override def setActive(sqlContext: SQLContext): Unit = 
super.setActive(sqlContext)
 
   /**
-   * Converts an iterator of Java Beans to InternalRow using the provided
-   * bean info & schema. This is not related to the singleton, but is a static
-   * method for internal use.
+   * Converts an iterator of Java Beans to InternalRow using the provided bean 
info & schema. This
+   * is not related to the singleton, but is a static method for internal use.
    */
   private[sql] def beansToRows(
       data: Iterator[_],
@@ -1058,7 +400,9 @@ object SQLContext {
       attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
     def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any 
=> InternalRow = {
       val methodConverters =
-        JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
+        JavaTypeInference
+          .getJavaBeanReadableProperties(cls)
+          .zip(fieldTypes)
           .map { case (property, fieldType) =>
             val method = property.getReadMethod
             method -> createConverter(method.getReturnType, fieldType)
@@ -1067,16 +411,17 @@ object SQLContext {
         if (value == null) {
           null
         } else {
-          new GenericInternalRow(
-            methodConverters.map { case (method, converter) =>
-              converter(method.invoke(value))
-            })
+          new GenericInternalRow(methodConverters.map { case (method, 
converter) =>
+            converter(method.invoke(value))
+          })
         }
     }
+
     def createConverter(cls: Class[_], dataType: DataType): Any => Any = 
dataType match {
       case struct: StructType => createStructConverter(cls, 
struct.map(_.dataType))
       case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
     }
+
     val dataConverter = createStructConverter(beanClass, attrs.map(_.dataType))
     data.map(dataConverter)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index d81768c0077e..ea0d405d2a8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -99,6 +99,29 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
     assert(sqlContext.tables().filter("tableName = 
'listtablessuitetable'").count() === 0)
   }
 
+  test("get tables from a database") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+
+    try {
+      sqlContext.sql("CREATE DATABASE IF NOT EXISTS temp_db_1")
+      sqlContext.sql("CREATE TABLE temp_db_1.temp_table_1 (key int)")
+      sqlContext.sql("INSERT INTO temp_db_1.temp_table_1 VALUES (1)")
+
+      
assert(sqlContext.tableNames("temp_db_1").sameElements(Array("temp_table_1")))
+
+      assert(sqlContext.tables("temp_db_1").collect().toSeq ==
+        Row("temp_db_1", "temp_table_1", false) :: Nil)
+
+      assert(sqlContext.tables().collect().toSeq == Nil)
+      sqlContext.sql("USE temp_db_1")
+      assert(sqlContext.tableNames().sameElements(Array("temp_table_1")))
+      assert(sqlContext.tables().collect().toSeq == Row("temp_db_1", 
"temp_table_1", false) :: Nil)
+    } finally {
+      sqlContext.sql("USE default")
+      sqlContext.sql("DROP DATABASE IF EXISTS temp_db_1 CASCADE")
+    }
+  }
+
   test("getting all tables with a database name has no impact on returned 
table names") {
     val sqlContext = SQLContext.getOrCreate(sc)
     val df = sqlContext.range(10)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to