Repository: spark
Updated Branches:
  refs/heads/master cb8ea9e1f -> 8fc267ab3


http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 7a2b60d..b5ee9a6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, 
HadoopFsRelation, LogicalRelation}
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
+import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -45,8 +45,7 @@ private[hive]
 case class AnalyzeTable(tableName: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val sessionState = sqlContext.sessionState
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
     val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
     val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
 
@@ -60,7 +59,7 @@ case class AnalyzeTable(tableName: String) extends 
RunnableCommand {
         // Can we use fs.getContentSummary in future?
         // Seems fs.getContentSummary returns wrong table size on Jenkins. So 
we use
         // countFileSize to count the table size.
-        val stagingDir = hiveContext.metadataHive.getConf(
+        val stagingDir = sessionState.metadataHive.getConf(
           HiveConf.ConfVars.STAGINGDIR.varname,
           HiveConf.ConfVars.STAGINGDIR.defaultStrVal)
 
@@ -106,7 +105,7 @@ case class AnalyzeTable(tableName: String) extends 
RunnableCommand {
             .map(_.toLong)
             .getOrElse(0L)
         val newTotalSize =
-          getFileSizeForTable(hiveContext.sessionState.hiveconf, 
relation.hiveQlTable)
+          getFileSizeForTable(sessionState.hiveconf, relation.hiveQlTable)
         // Update the Hive metastore if the total size of the table is 
different than the size
         // recorded in the Hive metastore.
         // This logic is based on 
org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
@@ -144,9 +143,8 @@ private[hive]
 case class AddFile(path: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    hiveContext.runSqlHive(s"ADD FILE $path")
-    hiveContext.sparkContext.addFile(path)
+    sqlContext.sessionState.runNativeSql(s"ADD FILE $path")
+    sqlContext.sparkContext.addFile(path)
     Seq.empty[Row]
   }
 }
@@ -176,9 +174,9 @@ case class CreateMetastoreDataSource(
     }
 
     val tableName = tableIdent.unquotedString
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
 
-    if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
+    if (sessionState.catalog.tableExists(tableIdent)) {
       if (allowExisting) {
         return Seq.empty[Row]
       } else {
@@ -190,8 +188,7 @@ case class CreateMetastoreDataSource(
     val optionsWithPath =
       if (!options.contains("path") && managedIfNoPath) {
         isExternal = false
-        options + ("path" ->
-          
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" -> 
sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }
@@ -204,7 +201,7 @@ case class CreateMetastoreDataSource(
       bucketSpec = None,
       options = optionsWithPath).resolveRelation()
 
-    hiveContext.sessionState.catalog.createDataSourceTable(
+    sessionState.catalog.createDataSourceTable(
       tableIdent,
       userSpecifiedSchema,
       Array.empty[String],
@@ -243,14 +240,13 @@ case class CreateMetastoreDataSourceAsSelect(
     }
 
     val tableName = tableIdent.unquotedString
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
     var createMetastoreTable = false
     var isExternal = true
     val optionsWithPath =
       if (!options.contains("path")) {
         isExternal = false
-        options + ("path" ->
-          
hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
+        options + ("path" -> 
sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
       } else {
         options
       }
@@ -281,14 +277,14 @@ case class CreateMetastoreDataSourceAsSelect(
           // inserting into (i.e. using the same compression).
 
           EliminateSubqueryAliases(
-            sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
+            sessionState.catalog.lookupRelation(tableIdent)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _, _) =>
               existingSchema = Some(l.schema)
             case o =>
               throw new AnalysisException(s"Saving data in ${o.toString} is 
not supported.")
           }
         case SaveMode.Overwrite =>
-          hiveContext.sql(s"DROP TABLE IF EXISTS $tableName")
+          sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
           // Need to create the table again.
           createMetastoreTable = true
       }
@@ -297,7 +293,7 @@ case class CreateMetastoreDataSourceAsSelect(
       createMetastoreTable = true
     }
 
-    val data = Dataset.ofRows(hiveContext, query)
+    val data = Dataset.ofRows(sqlContext, query)
     val df = existingSchema match {
       // If we are inserting into an existing table, just use the existing 
schema.
       case Some(s) => data.selectExpr(s.fieldNames: _*)
@@ -318,7 +314,7 @@ case class CreateMetastoreDataSourceAsSelect(
       // We will use the schema of resolved.relation as the schema of the 
table (instead of
       // the schema of df). It is important since the nullability may be 
changed by the relation
       // provider (for example, see 
org.apache.spark.sql.parquet.DefaultSource).
-      hiveContext.sessionState.catalog.createDataSourceTable(
+      sessionState.catalog.createDataSourceTable(
         tableIdent,
         Some(result.schema),
         partitionColumns,
@@ -329,7 +325,7 @@ case class CreateMetastoreDataSourceAsSelect(
     }
 
     // Refresh the cache of the table in the catalog.
-    hiveContext.sessionState.catalog.refreshTable(tableIdent)
+    sessionState.catalog.refreshTable(tableIdent)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 2767528..e629099 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -32,16 +32,16 @@ import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.command.CacheTableCommand
-import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
+import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -71,42 +71,80 @@ object TestHive
  * hive metastore seems to lead to weird non-deterministic failures.  
Therefore, the execution of
  * test cases that rely on TestHive must be serialized.
  */
-class TestHiveContext private[hive](
-    testHiveSharedState: TestHiveSharedState,
-    val warehousePath: File,
-    val scratchDirPath: File,
-    metastoreTemporaryConf: Map[String, String],
-    isRootContext: Boolean)
-  extends HiveContext(testHiveSharedState, isRootContext) { self =>
+class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, 
isRootContext: Boolean)
+  extends HiveContext(sparkSession, isRootContext) {
 
-  private def this(
-      sc: SparkContext,
-      warehousePath: File,
-      scratchDirPath: File,
-      metastoreTemporaryConf: Map[String, String]) {
-    this(
-      new TestHiveSharedState(sc, warehousePath, scratchDirPath, 
metastoreTemporaryConf),
-      warehousePath,
-      scratchDirPath,
-      metastoreTemporaryConf,
-      true)
+  def this(sc: SparkContext) {
+    this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), 
true)
+  }
+
+  override def newSession(): TestHiveContext = {
+    new TestHiveContext(sparkSession.newSession(), false)
+  }
+
+  override def sharedState: TestHiveSharedState = sparkSession.sharedState
+
+  override def sessionState: TestHiveSessionState = sparkSession.sessionState
+
+  def setCacheTables(c: Boolean): Unit = {
+    sparkSession.setCacheTables(c)
+  }
+
+  def getHiveFile(path: String): File = {
+    sparkSession.getHiveFile(path)
+  }
+
+  def loadTestTable(name: String): Unit = {
+    sparkSession.loadTestTable(name)
   }
 
+  def reset(): Unit = {
+    sparkSession.reset()
+  }
+
+}
+
+
+private[hive] class TestHiveSparkSession(
+    sc: SparkContext,
+    val warehousePath: File,
+    scratchDirPath: File,
+    metastoreTemporaryConf: Map[String, String],
+    existingSharedState: Option[TestHiveSharedState])
+  extends SparkSession(sc) with Logging { self =>
+
   def this(sc: SparkContext) {
     this(
       sc,
       Utils.createTempDir(namePrefix = "warehouse"),
       TestHiveContext.makeScratchDir(),
-      HiveContext.newTemporaryConfiguration(useInMemoryDerby = false))
+      HiveContext.newTemporaryConfiguration(useInMemoryDerby = false),
+      None)
   }
 
-  override def newSession(): HiveContext = {
-    new TestHiveContext(
-      testHiveSharedState,
-      warehousePath,
-      scratchDirPath,
-      metastoreTemporaryConf,
-      isRootContext = false)
+  assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+
+  // TODO: Let's remove TestHiveSharedState and TestHiveSessionState. 
Otherwise,
+  // we are not really testing the reflection logic based on the setting of
+  // CATALOG_IMPLEMENTATION.
+  @transient
+  override lazy val sharedState: TestHiveSharedState = {
+    existingSharedState.getOrElse(
+      new TestHiveSharedState(sc, warehousePath, scratchDirPath, 
metastoreTemporaryConf))
+  }
+
+  @transient
+  override lazy val sessionState: TestHiveSessionState = new 
TestHiveSessionState(self)
+
+  override def newSession(): TestHiveSparkSession = {
+    new TestHiveSparkSession(
+      sc, warehousePath, scratchDirPath, metastoreTemporaryConf, 
Some(sharedState))
+  }
+
+  private var cacheTables: Boolean = false
+
+  def setCacheTables(c: Boolean): Unit = {
+    cacheTables = c
   }
 
   // By clearing the port we force Spark to pick a new one.  This allows us to 
rerun tests
@@ -118,9 +156,10 @@ class TestHiveContext private[hive](
 
   // A snapshot of the entries in the starting SQLConf
   // We save this because tests can mutate this singleton object if they want
+  // This snapshot is saved when we create this TestHiveSparkSession.
   val initialSQLConf: SQLConf = {
     val snapshot = new SQLConf
-    conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
+    sessionState.conf.getAllConfs.foreach { case (k, v) => 
snapshot.setConfString(k, v) }
     snapshot
   }
 
@@ -131,42 +170,10 @@ class TestHiveContext private[hive](
 
   /** The location of the compiled hive distribution */
   lazy val hiveHome = envVarToFile("HIVE_HOME")
+
   /** The location of the hive source code. */
   lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME")
 
-  // Override so we can intercept relative paths and rewrite them to point at 
hive.
-  override def runSqlHive(sql: String): Seq[String] =
-    
super.runSqlHive(rewritePaths(substitutor.substitute(sessionState.hiveconf, 
sql)))
-
-  override def executePlan(plan: LogicalPlan): this.QueryExecution =
-    new this.QueryExecution(plan)
-
-  @transient
-  protected[sql] override lazy val sessionState = new HiveSessionState(this) {
-    override lazy val conf: SQLConf = {
-      new SQLConf {
-        clear()
-        override def caseSensitiveAnalysis: Boolean = 
getConf(SQLConf.CASE_SENSITIVE, false)
-        override def clear(): Unit = {
-          super.clear()
-          TestHiveContext.overrideConfs.map {
-            case (key, value) => setConfString(key, value)
-          }
-        }
-      }
-    }
-
-    override lazy val functionRegistry = {
-      // We use TestHiveFunctionRegistry at here to track functions that have 
been explicitly
-      // unregistered (through TestHiveFunctionRegistry.unregisterFunction 
method).
-      val fr = new TestHiveFunctionRegistry
-      
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach {
-        case (name, (info, builder)) => fr.registerFunction(name, info, 
builder)
-      }
-      fr
-    }
-  }
-
   /**
    * Returns the value of specified environmental variable as a 
[[java.io.File]] after checking
    * to ensure it exists
@@ -179,7 +186,7 @@ class TestHiveContext private[hive](
    * Replaces relative paths to the parent directory "../" with hiveDevHome 
since this is how the
    * hive test cases assume the system is set up.
    */
-  private def rewritePaths(cmd: String): String =
+  private[hive] def rewritePaths(cmd: String): String =
     if (cmd.toUpperCase contains "LOAD DATA") {
       val testDataLocation =
         
hiveDevHome.map(_.getCanonicalPath).getOrElse(inRepoTests.getCanonicalPath)
@@ -210,36 +217,11 @@ class TestHiveContext private[hive](
 
   val describedTable = "DESCRIBE (\\w+)".r
 
-  /**
-   * Override QueryExecution with special debug workflow.
-   */
-  class QueryExecution(logicalPlan: LogicalPlan)
-    extends super.QueryExecution(logicalPlan) {
-    def this(sql: String) = this(parseSql(sql))
-    override lazy val analyzed = {
-      val describedTables = logical match {
-        case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil
-        case CacheTableCommand(tbl, _, _) => tbl :: Nil
-        case _ => Nil
-      }
-
-      // Make sure any test tables referenced are loaded.
-      val referencedTables =
-        describedTables ++
-        logical.collect { case UnresolvedRelation(tableIdent, _) => 
tableIdent.table }
-      val referencedTestTables = referencedTables.filter(testTables.contains)
-      logDebug(s"Query references test tables: 
${referencedTestTables.mkString(", ")}")
-      referencedTestTables.foreach(loadTestTable)
-      // Proceed with analysis.
-      sessionState.analyzer.execute(logical)
-    }
-  }
-
   case class TestTable(name: String, commands: (() => Unit)*)
 
   protected[hive] implicit class SqlCmd(sql: String) {
     def cmd: () => Unit = {
-      () => new QueryExecution(sql).stringResult(): Unit
+      () => new TestHiveQueryExecution(sql).stringResult(): Unit
     }
   }
 
@@ -266,19 +248,20 @@ class TestHiveContext private[hive](
       "CREATE TABLE src1 (key INT, value STRING)".cmd,
       s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO 
TABLE src1".cmd),
     TestTable("srcpart", () => {
-      runSqlHive(
+      sessionState.runNativeSql(
         "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds 
STRING, hr STRING)")
       for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
-        runSqlHive(
+        sessionState.runNativeSql(
           s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
              |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
            """.stripMargin)
       }
     }),
     TestTable("srcpart1", () => {
-      runSqlHive("CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY 
(ds STRING, hr INT)")
+      sessionState.runNativeSql(
+        "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds 
STRING, hr INT)")
       for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
-        runSqlHive(
+        sessionState.runNativeSql(
           s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
              |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
            """.stripMargin)
@@ -289,7 +272,7 @@ class TestHiveContext private[hive](
       import org.apache.hadoop.mapred.{SequenceFileInputFormat, 
SequenceFileOutputFormat}
       import org.apache.thrift.protocol.TBinaryProtocol
 
-      runSqlHive(
+      sessionState.runNativeSql(
         s"""
          |CREATE TABLE src_thrift(fake INT)
          |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
@@ -302,7 +285,7 @@ class TestHiveContext private[hive](
          |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
         """.stripMargin)
 
-      runSqlHive(
+      sessionState.runNativeSql(
         s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' 
INTO TABLE src_thrift")
     }),
     TestTable("serdeins",
@@ -415,7 +398,6 @@ class TestHiveContext private[hive](
 
   private val loadedTables = new collection.mutable.HashSet[String]
 
-  var cacheTables: Boolean = false
   def loadTestTable(name: String) {
     if (!(loadedTables contains name)) {
       // Marks the table as loaded first to prevent infinite mutually 
recursive table loading.
@@ -426,7 +408,7 @@ class TestHiveContext private[hive](
       createCmds.foreach(_())
 
       if (cacheTables) {
-        cacheTable(name)
+        new SQLContext(self).cacheTable(name)
       }
     }
   }
@@ -451,11 +433,12 @@ class TestHiveContext private[hive](
         }
       }
 
-      cacheManager.clearCache()
+      sharedState.cacheManager.clearCache()
       loadedTables.clear()
       sessionState.catalog.clearTempTables()
       sessionState.catalog.invalidateCache()
-      metadataHive.reset()
+
+      sessionState.metadataHive.reset()
 
       
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
         foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
@@ -464,21 +447,21 @@ class TestHiveContext private[hive](
       sessionState.hiveconf.set("fs.default.name", new 
File(".").toURI.toString)
       // It is important that we RESET first as broken hooks that might have 
been set could break
       // other sql exec here.
-      executionHive.runSqlHive("RESET")
-      metadataHive.runSqlHive("RESET")
+      sessionState.executionHive.runSqlHive("RESET")
+      sessionState.metadataHive.runSqlHive("RESET")
       // For some reason, RESET does not reset the following variables...
       // https://issues.apache.org/jira/browse/HIVE-9004
-      runSqlHive("set hive.table.parameters.default=")
-      runSqlHive("set datanucleus.cache.collections=true")
-      runSqlHive("set datanucleus.cache.collections.lazy=true")
+      sessionState.runNativeSql("set hive.table.parameters.default=")
+      sessionState.runNativeSql("set datanucleus.cache.collections=true")
+      sessionState.runNativeSql("set datanucleus.cache.collections.lazy=true")
       // Lots of tests fail if we do not change the partition whitelist from 
the default.
-      runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
+      sessionState.runNativeSql("set 
hive.metastore.partition.name.whitelist.pattern=.*")
 
       // In case a test changed any of these values, restore all the original 
ones here.
       TestHiveContext.hiveClientConfigurations(
         sessionState.hiveconf, warehousePath, scratchDirPath, 
metastoreTemporaryConf)
-          .foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") }
-      defaultOverrides()
+          .foreach { case (k, v) => sessionState.metadataHive.runSqlHive(s"SET 
$k=$v") }
+      sessionState.setDefaultOverrideConfs()
 
       sessionState.catalog.setCurrentDatabase("default")
     } catch {
@@ -489,6 +472,40 @@ class TestHiveContext private[hive](
 
 }
 
+
+private[hive] class TestHiveQueryExecution(
+    sparkSession: TestHiveSparkSession,
+    logicalPlan: LogicalPlan)
+  extends HiveQueryExecution(new SQLContext(sparkSession), logicalPlan) with 
Logging {
+
+  def this(sparkSession: TestHiveSparkSession, sql: String) {
+    this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql))
+  }
+
+  def this(sql: String) {
+    this(TestHive.sparkSession, sql)
+  }
+
+  override lazy val analyzed: LogicalPlan = {
+    val describedTables = logical match {
+      case HiveNativeCommand(sparkSession.describedTable(tbl)) => tbl :: Nil
+      case CacheTableCommand(tbl, _, _) => tbl :: Nil
+      case _ => Nil
+    }
+
+    // Make sure any test tables referenced are loaded.
+    val referencedTables =
+      describedTables ++
+        logical.collect { case UnresolvedRelation(tableIdent, _) => 
tableIdent.table }
+    val referencedTestTables = 
referencedTables.filter(sparkSession.testTables.contains)
+    logDebug(s"Query references test tables: 
${referencedTestTables.mkString(", ")}")
+    referencedTestTables.foreach(sparkSession.loadTestTable)
+    // Proceed with analysis.
+    sparkSession.sessionState.analyzer.execute(logical)
+  }
+}
+
+
 private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry {
 
   private val removedFunctions =
@@ -517,7 +534,43 @@ private[hive] class TestHiveSharedState(
     TestHiveContext.newClientForMetadata(
       sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, 
metastoreTemporaryConf)
   }
+}
+
+
+private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
+  extends HiveSessionState(new SQLContext(sparkSession)) {
 
+  override lazy val conf: SQLConf = {
+    new SQLConf {
+      clear()
+      override def caseSensitiveAnalysis: Boolean = 
getConf(SQLConf.CASE_SENSITIVE, false)
+      override def clear(): Unit = {
+        super.clear()
+        TestHiveContext.overrideConfs.map {
+          case (key, value) => setConfString(key, value)
+        }
+      }
+    }
+  }
+
+  override lazy val functionRegistry: TestHiveFunctionRegistry = {
+    // We use TestHiveFunctionRegistry at here to track functions that have 
been explicitly
+    // unregistered (through TestHiveFunctionRegistry.unregisterFunction 
method).
+    val fr = new TestHiveFunctionRegistry
+    
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach {
+      case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+    }
+    fr
+  }
+
+  override def executePlan(plan: LogicalPlan): TestHiveQueryExecution = {
+    new TestHiveQueryExecution(sparkSession, plan)
+  }
+
+  // Override so we can intercept relative paths and rewrite them to point at 
hive.
+  override def runNativeSql(sql: String): Seq[String] = {
+    
super.runNativeSql(sparkSession.rewritePaths(substitutor.substitute(hiveconf, 
sql)))
+  }
 }
 
 
@@ -552,7 +605,7 @@ private[hive] object TestHiveContext {
   /**
    * Configurations needed to create a [[HiveClient]].
    */
-  private def hiveClientConfigurations(
+  def hiveClientConfigurations(
       hiveconf: HiveConf,
       warehousePath: File,
       scratchDirPath: File,
@@ -564,7 +617,7 @@ private[hive] object TestHiveContext {
       ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1")
   }
 
-  private def makeScratchDir(): File = {
+  def makeScratchDir(): File = {
     val scratchDir = Utils.createTempDir(namePrefix = "scratch")
     scratchDir.delete()
     scratchDir

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index b9e7a36..61910b8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -23,7 +23,6 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.{AnalysisException, QueryTest}
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.hive.execution.HiveSqlParser
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
 class ErrorPositionSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfterEach {
@@ -131,7 +130,7 @@ class ErrorPositionSuite extends QueryTest with 
TestHiveSingleton with BeforeAnd
    * @param token a unique token in the string that should be indicated by the 
exception
    */
   def positionTest(name: String, query: String, token: String): Unit = {
-    def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
+    def ast = hiveContext.parseSql(query)
     def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
 
     test(name) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
index b644a50..b2c0f7e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
@@ -28,9 +28,12 @@ class HiveContextSuite extends SparkFunSuite {
     val sc = TestHive.sparkContext
     require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
       "org.apache.spark.sql.hive.execution.PairSerDe")
-    
assert(TestHive.initialSQLConf.getConfString("spark.sql.hive.metastore.barrierPrefixes")
 ==
+    assert(TestHive.sparkSession.initialSQLConf.getConfString(
+      "spark.sql.hive.metastore.barrierPrefixes") ==
       "org.apache.spark.sql.hive.execution.PairSerDe")
-    
assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes",
 "") ==
+    // This setting should be also set in the hiveconf of the current session.
+    assert(TestHive.sessionState.hiveconf.get(
+      "spark.sql.hive.metastore.barrierPrefixes", "") ==
       "org.apache.spark.sql.hive.execution.PairSerDe")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 8648834..2a201c1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -96,7 +96,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
         assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
         checkAnswer(table("t"), testDF)
-        assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
+        assert(sessionState.runNativeSql("SELECT * FROM t") === Seq("1.1\t1", 
"2.1\t2"))
       }
     }
 
@@ -129,7 +129,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string"))
 
           checkAnswer(table("t"), testDF)
-          assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
+          assert(sessionState.runNativeSql("SELECT * FROM t") === 
Seq("1.1\t1", "2.1\t2"))
         }
       }
     }
@@ -159,7 +159,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(columns.map(_.dataType) === Seq("int", "string"))
 
           checkAnswer(table("t"), Row(1, "val_1"))
-          assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
+          assert(sessionState.runNativeSql("SELECT * FROM t") === 
Seq("1\tval_1"))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d0e6870..bbe135b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -253,13 +253,13 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
         // Discard the cached relation.
-        invalidateTable("jsonTable")
+        sessionState.invalidateTable("jsonTable")
 
         checkAnswer(
           sql("SELECT * FROM jsonTable"),
           sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
-        invalidateTable("jsonTable")
+        sessionState.invalidateTable("jsonTable")
         val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, 
true) :: Nil)
 
         assert(expectedSchema === table("jsonTable").schema)
@@ -347,7 +347,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
            """.stripMargin)
 
         // Discard the cached relation.
-        invalidateTable("ctasJsonTable")
+        sessionState.invalidateTable("ctasJsonTable")
 
         // Schema should not be changed.
         assert(table("ctasJsonTable").schema === table("jsonTable").schema)
@@ -422,7 +422,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
         (6 to 10).map(i => Row(i, s"str$i")))
 
-      invalidateTable("savedJsonTable")
+      sessionState.invalidateTable("savedJsonTable")
 
       checkAnswer(
         sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -620,7 +620,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         .mode(SaveMode.Append)
         .saveAsTable("arrayInParquet")
 
-      refreshTable("arrayInParquet")
+      sessionState.refreshTable("arrayInParquet")
 
       checkAnswer(
         sql("SELECT a FROM arrayInParquet"),
@@ -679,7 +679,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         .mode(SaveMode.Append)
         .saveAsTable("mapInParquet")
 
-      refreshTable("mapInParquet")
+      sessionState.refreshTable("mapInParquet")
 
       checkAnswer(
         sql("SELECT a FROM mapInParquet"),
@@ -707,7 +707,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
             options = Map("path" -> tempDir.getCanonicalPath),
             isExternal = false)
 
-          invalidateTable("wide_schema")
+          sessionState.invalidateTable("wide_schema")
 
           val actualSchema = table("wide_schema").schema
           assert(schema === actualSchema)
@@ -737,9 +737,9 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           "spark.sql.sources.schema" -> schema.json,
           "EXTERNAL" -> "FALSE"))
 
-      hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false)
+      sharedState.externalCatalog.createTable("default", hiveTable, 
ignoreIfExists = false)
 
-      invalidateTable(tableName)
+      sessionState.invalidateTable(tableName)
       val actualSchema = table(tableName).schema
       assert(schema === actualSchema)
     }
@@ -751,8 +751,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
     withTable(tableName) {
       df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
-      invalidateTable(tableName)
-      val metastoreTable = hiveCatalog.getTable("default", tableName)
+      sessionState.invalidateTable(tableName)
+      val metastoreTable = sharedState.externalCatalog.getTable("default", 
tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
 
       val numPartCols = 
metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
@@ -786,8 +786,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         .bucketBy(8, "d", "b")
         .sortBy("c")
         .saveAsTable(tableName)
-      invalidateTable(tableName)
-      val metastoreTable = hiveCatalog.getTable("default", tableName)
+      sessionState.invalidateTable(tableName)
+      val metastoreTable = sharedState.externalCatalog.getTable("default", 
tableName)
       val expectedBucketByColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)
 
@@ -917,7 +917,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
       // As a proxy for verifying that the table was stored in Hive compatible 
format,
       // we verify that each column of the table is of native type StringType.
-      assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema
+      assert(sharedState.externalCatalog.getTable("default", 
"not_skip_hive_metadata").schema
         .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == 
StringType))
 
       sessionState.catalog.createDataSourceTable(
@@ -931,9 +931,8 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
       // As a proxy for verifying that the table was stored in SparkSQL format,
       // we verify that the table has a column type as array of StringType.
-      assert(hiveCatalog.getTable("default", 
"skip_hive_metadata").schema.forall { c =>
-        HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType)
-      })
+      assert(sharedState.externalCatalog.getTable("default", 
"skip_hive_metadata")
+        .schema.forall { c => HiveMetastoreTypes.toDataType(c.dataType) == 
ArrayType(StringType) })
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 3c00350..850cb1e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils 
with TestHiveSingle
   private lazy val df = sqlContext.range(10).coalesce(1).toDF()
 
   private def checkTablePath(dbName: String, tableName: String): Unit = {
-    val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName)
-    val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri 
+ "/" + tableName
+    val metastoreTable = 
hiveContext.sharedState.externalCatalog.getTable(dbName, tableName)
+    val expectedPath =
+      hiveContext.sharedState.externalCatalog.getDatabase(dbName).locationUri 
+ "/" + tableName
 
     assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
   }
@@ -216,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
 
           df.write.parquet(s"$path/p=2")
           sql("ALTER TABLE t ADD PARTITION (p=2)")
-          hiveContext.refreshTable("t")
+          hiveContext.sessionState.refreshTable("t")
           checkAnswer(
             sqlContext.table("t"),
             df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
@@ -248,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
 
         df.write.parquet(s"$path/p=2")
         sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
-        hiveContext.refreshTable(s"$db.t")
+        hiveContext.sessionState.refreshTable(s"$db.t")
         checkAnswer(
           sqlContext.table(s"$db.t"),
           df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index d14c72b..adc7af3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -31,7 +31,7 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton {
 
   test("parse analyze commands") {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
-      val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
+      val parsed = hiveContext.parseSql(analyzeCommand)
       val operators = parsed.collect {
         case a: AnalyzeTable => a
         case o => o
@@ -116,7 +116,7 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton {
     // Try to analyze a temp table
     sql("""SELECT * FROM src""").registerTempTable("tempTable")
     intercept[UnsupportedOperationException] {
-      hiveContext.analyze("tempTable")
+      hiveContext.sessionState.analyze("tempTable")
     }
     hiveContext.sessionState.catalog.dropTable(
       TableIdentifier("tempTable"), ignoreIfNotExists = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
index a3f5921..c58a664 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.hive.execution
 
 import java.io.File
 
-import org.apache.spark.sql.hive.test.TestHive._
 
 /**
  * A set of test cases based on the big-data-benchmark.
  * https://amplab.cs.berkeley.edu/benchmark/
  */
 class BigDataBenchmarkSuite extends HiveComparisonTest {
-  val testDataDirectory = new File("target" + File.separator + 
"big-data-benchmark-testdata")
+  import org.apache.spark.sql.hive.test.TestHive.sparkSession._
 
+  val testDataDirectory = new File("target" + File.separator + 
"big-data-benchmark-testdata")
   val userVisitPath = new File(testDataDirectory, 
"uservisits").getCanonicalPath
   val testTables = Seq(
     TestTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index e67fcbe..bd46cb9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -30,8 +30,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand}
 import org.apache.spark.sql.execution.datasources.DescribeCommand
-import org.apache.spark.sql.hive.{InsertIntoHiveTable => 
LogicalInsertIntoHiveTable, SQLBuilder}
-import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.{InsertIntoHiveTable => 
LogicalInsertIntoHiveTable}
+import org.apache.spark.sql.hive.SQLBuilder
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
 
 /**
  * Allows the creations of tests that execute the same query against both hive
@@ -141,7 +142,7 @@ abstract class HiveComparisonTest
   }
 
   protected def prepareAnswer(
-    hiveQuery: TestHive.type#QueryExecution,
+    hiveQuery: TestHiveQueryExecution,
     answer: Seq[String]): Seq[String] = {
 
     def isSorted(plan: LogicalPlan): Boolean = plan match {
@@ -332,7 +333,7 @@ abstract class HiveComparisonTest
             hiveCachedResults
           } else {
 
-            val hiveQueries = queryList.map(new TestHive.QueryExecution(_))
+            val hiveQueries = queryList.map(new TestHiveQueryExecution(_))
             // Make sure we can at least parse everything before attempting 
hive execution.
             // Note this must only look at the logical plan as we might not be 
able to analyze if
             // other DDL has not been executed yet.
@@ -352,7 +353,7 @@ abstract class HiveComparisonTest
                     case _: ExplainCommand =>
                       // No need to execute EXPLAIN queries as we don't check 
the output.
                       Nil
-                    case _ => TestHive.runSqlHive(queryString)
+                    case _ => TestHive.sessionState.runNativeSql(queryString)
                   }
 
                   // We need to add a new line to non-empty answers so we can 
differentiate Seq()
@@ -382,10 +383,10 @@ abstract class HiveComparisonTest
 
         // Run w/ catalyst
         val catalystResults = queryList.zip(hiveResults).map { case 
(queryString, hive) =>
-          var query: TestHive.QueryExecution = null
+          var query: TestHiveQueryExecution = null
           try {
             query = {
-              val originalQuery = new TestHive.QueryExecution(queryString)
+              val originalQuery = new TestHiveQueryExecution(queryString)
               val containsCommands = originalQuery.analyzed.collectFirst {
                 case _: Command => ()
                 case _: LogicalInsertIntoHiveTable => ()
@@ -409,7 +410,7 @@ abstract class HiveComparisonTest
                 }
 
                 try {
-                  val queryExecution = new 
TestHive.QueryExecution(convertedSQL)
+                  val queryExecution = new TestHiveQueryExecution(convertedSQL)
                   // Trigger the analysis of this converted SQL query.
                   queryExecution.analyzed
                   queryExecution
@@ -472,12 +473,12 @@ abstract class HiveComparisonTest
               // If this query is reading other tables that were created 
during this test run
               // also print out the query plans and results for those.
               val computedTablesMessages: String = try {
-                val tablesRead = new 
TestHive.QueryExecution(query).executedPlan.collect {
+                val tablesRead = new 
TestHiveQueryExecution(query).executedPlan.collect {
                   case ts: HiveTableScan => ts.relation.tableName
                 }.toSet
 
                 TestHive.reset()
-                val executions = queryList.map(new TestHive.QueryExecution(_))
+                val executions = queryList.map(new TestHiveQueryExecution(_))
                 executions.foreach(_.toRdd)
                 val tablesGenerated = queryList.zip(executions).flatMap {
                   // We should take executedPlan instead of sparkPlan, because 
in following codes we
@@ -562,8 +563,8 @@ abstract class HiveComparisonTest
             // okay by running a simple query. If this fails then we halt 
testing since
             // something must have gone seriously wrong.
             try {
-              new TestHive.QueryExecution("SELECT key FROM src").stringResult()
-              TestHive.runSqlHive("SELECT key FROM src")
+              new TestHiveQueryExecution("SELECT key FROM src").stringResult()
+              TestHive.sessionState.runNativeSql("SELECT key FROM src")
             } catch {
               case e: Exception =>
                 logError(s"FATAL ERROR: Canary query threw $e This implies 
that the " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2e7a1d9..93d63f2 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -49,7 +49,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 
   override def beforeAll() {
     super.beforeAll()
-    TestHive.cacheTables = true
+    TestHive.setCacheTables(true)
     // Timezone is fixed to America/Los_Angeles for those timezone sensitive 
tests (timestamp_*)
     TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
     // Add Locale setting
@@ -58,7 +58,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 
   override def afterAll() {
     try {
-      TestHive.cacheTables = false
+      TestHive.setCacheTables(false)
       TimeZone.setDefault(originalTimeZone)
       Locale.setDefault(originalLocale)
       sql("DROP TEMPORARY FUNCTION IF EXISTS udtf_count2")
@@ -1009,7 +1009,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
         .mkString("/")
 
       // Loads partition data to a temporary table to verify contents
-      val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
+      val path = 
s"${sparkSession.warehousePath}/dynamic_part_table/$partFolder/part-00000"
 
       sql("DROP TABLE IF EXISTS dp_verify")
       sql("CREATE TABLE dp_verify(intcol INT)")

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
index 5586a79..b8af0b3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
@@ -28,8 +28,8 @@ class HiveSerDeSuite extends HiveComparisonTest with 
BeforeAndAfterAll {
   override def beforeAll(): Unit = {
     import TestHive._
     import org.apache.hadoop.hive.serde2.RegexSerDe
-      super.beforeAll()
-    TestHive.cacheTables = false
+    super.beforeAll()
+    TestHive.setCacheTables(false)
     sql(s"""CREATE TABLE IF NOT EXISTS sales (key STRING, value INT)
        |ROW FORMAT SERDE '${classOf[RegexSerDe].getCanonicalName}'
        |WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)\t([^ ]*)")

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 97cb9d9..79ac53c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -21,18 +21,22 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution}
 
 /**
  * A set of test cases that validate partition and column pruning.
  */
 class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
-  TestHive.cacheTables = false
 
-  // Column/partition pruning is not implemented for 
`InMemoryColumnarTableScan` yet, need to reset
-  // the environment to ensure all referenced tables in this suites are not 
cached in-memory.
-  // Refer to https://issues.apache.org/jira/browse/SPARK-2283 for details.
-  TestHive.reset()
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    TestHive.setCacheTables(false)
+    // Column/partition pruning is not implemented for 
`InMemoryColumnarTableScan` yet,
+    // need to reset the environment to ensure all referenced tables in this 
suites are
+    // not cached in-memory. Refer to 
https://issues.apache.org/jira/browse/SPARK-2283
+    // for details.
+    TestHive.reset()
+  }
 
   // Column pruning tests
 
@@ -144,7 +148,7 @@ class PruningSuite extends HiveComparisonTest with 
BeforeAndAfter {
       expectedScannedColumns: Seq[String],
       expectedPartValues: Seq[Seq[String]]): Unit = {
     test(s"$testCaseName - pruning test") {
-      val plan = new TestHive.QueryExecution(sql).sparkPlan
+      val plan = new TestHiveQueryExecution(sql).sparkPlan
       val actualOutputColumns = plan.output.map(_.name)
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1098e74..6b71e59 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       }
     }
 
-    val originalConf = convertCTAS
+    val originalConf = sessionState.convertCTAS
 
     setConf(HiveContext.CONVERT_CTAS, true)
 
@@ -731,7 +731,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 
1}]}"""))
     read.json(rdd).registerTempTable("data")
-    val originalConf = convertCTAS
+    val originalConf = sessionState.convertCTAS
     setConf(HiveContext.CONVERT_CTAS, false)
 
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 8f163f2..00b5c8d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with 
TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = noSerdeIOSchema
-      )(hiveContext),
+      )(hiveContext.sessionState.hiveconf),
       rowsDf.collect())
   }
 
@@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with 
TestHiveSingleton {
         output = Seq(AttributeReference("a", StringType)()),
         child = child,
         ioschema = serdeIOSchema
-      )(hiveContext),
+      )(hiveContext.sessionState.hiveconf),
       rowsDf.collect())
   }
 
@@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with 
TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = noSerdeIOSchema
-        )(hiveContext),
+        )(hiveContext.sessionState.hiveconf),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))
@@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with 
TestHiveSingleton {
           output = Seq(AttributeReference("a", StringType)()),
           child = ExceptionInjectingOperator(child),
           ioschema = serdeIOSchema
-        )(hiveContext),
+        )(hiveContext.sessionState.hiveconf),
         rowsDf.collect())
     }
     assert(e.getMessage().contains("intentional exception"))

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 4b2b1a1..6fa4c33 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -461,7 +461,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
     checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
-    invalidateTable("test_insert_parquet")
+    sessionState.invalidateTable("test_insert_parquet")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
     sql(
       """
@@ -474,7 +474,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
       sql("select * from test_insert_parquet"),
       sql("select a, b from jt").collect())
     // Invalidate the cache.
-    invalidateTable("test_insert_parquet")
+    sessionState.invalidateTable("test_insert_parquet")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
 
     // Create a partitioned table.
@@ -524,7 +524,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
           |select b, '2015-04-02', a FROM jt
         """.stripMargin).collect())
 
-    invalidateTable("test_parquet_partitioned_cache_test")
+    sessionState.invalidateTable("test_parquet_partitioned_cache_test")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")

http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index a0be55c..aa6101f 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -349,7 +349,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils 
with TestHiveSinglet
   test("error if there exists any malformed bucket files") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table")
-      val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+      val tableDir = new File(hiveContext.sparkSession.warehousePath, 
"bucketed_table")
       Utils.deleteRecursively(tableDir)
       df1.write.parquet(tableDir.getAbsolutePath)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to