Repository: spark Updated Branches: refs/heads/master cb8ea9e1f -> 8fc267ab3
http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 7a2b60d..b5ee9a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} +import org.apache.spark.sql.hive.{HiveSessionState, MetastoreRelation} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -45,8 +45,7 @@ private[hive] case class AnalyzeTable(tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val sessionState = sqlContext.sessionState - val hiveContext = sqlContext.asInstanceOf[HiveContext] + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) @@ -60,7 +59,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { // Can we use fs.getContentSummary in future? // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use // countFileSize to count the table size. - val stagingDir = hiveContext.metadataHive.getConf( + val stagingDir = sessionState.metadataHive.getConf( HiveConf.ConfVars.STAGINGDIR.varname, HiveConf.ConfVars.STAGINGDIR.defaultStrVal) @@ -106,7 +105,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { .map(_.toLong) .getOrElse(0L) val newTotalSize = - getFileSizeForTable(hiveContext.sessionState.hiveconf, relation.hiveQlTable) + getFileSizeForTable(sessionState.hiveconf, relation.hiveQlTable) // Update the Hive metastore if the total size of the table is different than the size // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). @@ -144,9 +143,8 @@ private[hive] case class AddFile(path: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.runSqlHive(s"ADD FILE $path") - hiveContext.sparkContext.addFile(path) + sqlContext.sessionState.runNativeSql(s"ADD FILE $path") + sqlContext.sparkContext.addFile(path) Seq.empty[Row] } } @@ -176,9 +174,9 @@ case class CreateMetastoreDataSource( } val tableName = tableIdent.unquotedString - val hiveContext = sqlContext.asInstanceOf[HiveContext] + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] - if (hiveContext.sessionState.catalog.tableExists(tableIdent)) { + if (sessionState.catalog.tableExists(tableIdent)) { if (allowExisting) { return Seq.empty[Row] } else { @@ -190,8 +188,7 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -204,7 +201,7 @@ case class CreateMetastoreDataSource( bucketSpec = None, options = optionsWithPath).resolveRelation() - hiveContext.sessionState.catalog.createDataSourceTable( + sessionState.catalog.createDataSourceTable( tableIdent, userSpecifiedSchema, Array.empty[String], @@ -243,14 +240,13 @@ case class CreateMetastoreDataSourceAsSelect( } val tableName = tableIdent.unquotedString - val hiveContext = sqlContext.asInstanceOf[HiveContext] + val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] var createMetastoreTable = false var isExternal = true val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -281,14 +277,14 @@ case class CreateMetastoreDataSourceAsSelect( // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match { + sessionState.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => existingSchema = Some(l.schema) case o => throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - hiveContext.sql(s"DROP TABLE IF EXISTS $tableName") + sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") // Need to create the table again. createMetastoreTable = true } @@ -297,7 +293,7 @@ case class CreateMetastoreDataSourceAsSelect( createMetastoreTable = true } - val data = Dataset.ofRows(hiveContext, query) + val data = Dataset.ofRows(sqlContext, query) val df = existingSchema match { // If we are inserting into an existing table, just use the existing schema. case Some(s) => data.selectExpr(s.fieldNames: _*) @@ -318,7 +314,7 @@ case class CreateMetastoreDataSourceAsSelect( // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). - hiveContext.sessionState.catalog.createDataSourceTable( + sessionState.catalog.createDataSourceTable( tableIdent, Some(result.schema), partitionColumns, @@ -329,7 +325,7 @@ case class CreateMetastoreDataSourceAsSelect( } // Refresh the cache of the table in the catalog. - hiveContext.sessionState.catalog.refreshTable(tableIdent) + sessionState.catalog.refreshTable(tableIdent) Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2767528..e629099 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -32,16 +32,16 @@ import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SQLContext +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.command.CacheTableCommand -import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -71,42 +71,80 @@ object TestHive * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of * test cases that rely on TestHive must be serialized. */ -class TestHiveContext private[hive]( - testHiveSharedState: TestHiveSharedState, - val warehousePath: File, - val scratchDirPath: File, - metastoreTemporaryConf: Map[String, String], - isRootContext: Boolean) - extends HiveContext(testHiveSharedState, isRootContext) { self => +class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean) + extends HiveContext(sparkSession, isRootContext) { - private def this( - sc: SparkContext, - warehousePath: File, - scratchDirPath: File, - metastoreTemporaryConf: Map[String, String]) { - this( - new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf), - warehousePath, - scratchDirPath, - metastoreTemporaryConf, - true) + def this(sc: SparkContext) { + this(new TestHiveSparkSession(HiveContext.withHiveExternalCatalog(sc)), true) + } + + override def newSession(): TestHiveContext = { + new TestHiveContext(sparkSession.newSession(), false) + } + + override def sharedState: TestHiveSharedState = sparkSession.sharedState + + override def sessionState: TestHiveSessionState = sparkSession.sessionState + + def setCacheTables(c: Boolean): Unit = { + sparkSession.setCacheTables(c) + } + + def getHiveFile(path: String): File = { + sparkSession.getHiveFile(path) + } + + def loadTestTable(name: String): Unit = { + sparkSession.loadTestTable(name) } + def reset(): Unit = { + sparkSession.reset() + } + +} + + +private[hive] class TestHiveSparkSession( + sc: SparkContext, + val warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String], + existingSharedState: Option[TestHiveSharedState]) + extends SparkSession(sc) with Logging { self => + def this(sc: SparkContext) { this( sc, Utils.createTempDir(namePrefix = "warehouse"), TestHiveContext.makeScratchDir(), - HiveContext.newTemporaryConfiguration(useInMemoryDerby = false)) + HiveContext.newTemporaryConfiguration(useInMemoryDerby = false), + None) } - override def newSession(): HiveContext = { - new TestHiveContext( - testHiveSharedState, - warehousePath, - scratchDirPath, - metastoreTemporaryConf, - isRootContext = false) + assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") + + // TODO: Let's remove TestHiveSharedState and TestHiveSessionState. Otherwise, + // we are not really testing the reflection logic based on the setting of + // CATALOG_IMPLEMENTATION. + @transient + override lazy val sharedState: TestHiveSharedState = { + existingSharedState.getOrElse( + new TestHiveSharedState(sc, warehousePath, scratchDirPath, metastoreTemporaryConf)) + } + + @transient + override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self) + + override def newSession(): TestHiveSparkSession = { + new TestHiveSparkSession( + sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState)) + } + + private var cacheTables: Boolean = false + + def setCacheTables(c: Boolean): Unit = { + cacheTables = c } // By clearing the port we force Spark to pick a new one. This allows us to rerun tests @@ -118,9 +156,10 @@ class TestHiveContext private[hive]( // A snapshot of the entries in the starting SQLConf // We save this because tests can mutate this singleton object if they want + // This snapshot is saved when we create this TestHiveSparkSession. val initialSQLConf: SQLConf = { val snapshot = new SQLConf - conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } + sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } snapshot } @@ -131,42 +170,10 @@ class TestHiveContext private[hive]( /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") + /** The location of the hive source code. */ lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME") - // Override so we can intercept relative paths and rewrite them to point at hive. - override def runSqlHive(sql: String): Seq[String] = - super.runSqlHive(rewritePaths(substitutor.substitute(sessionState.hiveconf, sql))) - - override def executePlan(plan: LogicalPlan): this.QueryExecution = - new this.QueryExecution(plan) - - @transient - protected[sql] override lazy val sessionState = new HiveSessionState(this) { - override lazy val conf: SQLConf = { - new SQLConf { - clear() - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - override def clear(): Unit = { - super.clear() - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) - } - } - } - } - - override lazy val functionRegistry = { - // We use TestHiveFunctionRegistry at here to track functions that have been explicitly - // unregistered (through TestHiveFunctionRegistry.unregisterFunction method). - val fr = new TestHiveFunctionRegistry - org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach { - case (name, (info, builder)) => fr.registerFunction(name, info, builder) - } - fr - } - } - /** * Returns the value of specified environmental variable as a [[java.io.File]] after checking * to ensure it exists @@ -179,7 +186,7 @@ class TestHiveContext private[hive]( * Replaces relative paths to the parent directory "../" with hiveDevHome since this is how the * hive test cases assume the system is set up. */ - private def rewritePaths(cmd: String): String = + private[hive] def rewritePaths(cmd: String): String = if (cmd.toUpperCase contains "LOAD DATA") { val testDataLocation = hiveDevHome.map(_.getCanonicalPath).getOrElse(inRepoTests.getCanonicalPath) @@ -210,36 +217,11 @@ class TestHiveContext private[hive]( val describedTable = "DESCRIBE (\\w+)".r - /** - * Override QueryExecution with special debug workflow. - */ - class QueryExecution(logicalPlan: LogicalPlan) - extends super.QueryExecution(logicalPlan) { - def this(sql: String) = this(parseSql(sql)) - override lazy val analyzed = { - val describedTables = logical match { - case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil - case CacheTableCommand(tbl, _, _) => tbl :: Nil - case _ => Nil - } - - // Make sure any test tables referenced are loaded. - val referencedTables = - describedTables ++ - logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } - val referencedTestTables = referencedTables.filter(testTables.contains) - logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") - referencedTestTables.foreach(loadTestTable) - // Proceed with analysis. - sessionState.analyzer.execute(logical) - } - } - case class TestTable(name: String, commands: (() => Unit)*) protected[hive] implicit class SqlCmd(sql: String) { def cmd: () => Unit = { - () => new QueryExecution(sql).stringResult(): Unit + () => new TestHiveQueryExecution(sql).stringResult(): Unit } } @@ -266,19 +248,20 @@ class TestHiveContext private[hive]( "CREATE TABLE src1 (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), TestTable("srcpart", () => { - runSqlHive( + sessionState.runNativeSql( "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) { - runSqlHive( + sessionState.runNativeSql( s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr') """.stripMargin) } }), TestTable("srcpart1", () => { - runSqlHive("CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)") + sessionState.runNativeSql( + "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)") for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) { - runSqlHive( + sessionState.runNativeSql( s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr') """.stripMargin) @@ -289,7 +272,7 @@ class TestHiveContext private[hive]( import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat} import org.apache.thrift.protocol.TBinaryProtocol - runSqlHive( + sessionState.runNativeSql( s""" |CREATE TABLE src_thrift(fake INT) |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}' @@ -302,7 +285,7 @@ class TestHiveContext private[hive]( |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}' """.stripMargin) - runSqlHive( + sessionState.runNativeSql( s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift") }), TestTable("serdeins", @@ -415,7 +398,6 @@ class TestHiveContext private[hive]( private val loadedTables = new collection.mutable.HashSet[String] - var cacheTables: Boolean = false def loadTestTable(name: String) { if (!(loadedTables contains name)) { // Marks the table as loaded first to prevent infinite mutually recursive table loading. @@ -426,7 +408,7 @@ class TestHiveContext private[hive]( createCmds.foreach(_()) if (cacheTables) { - cacheTable(name) + new SQLContext(self).cacheTable(name) } } } @@ -451,11 +433,12 @@ class TestHiveContext private[hive]( } } - cacheManager.clearCache() + sharedState.cacheManager.clearCache() loadedTables.clear() sessionState.catalog.clearTempTables() sessionState.catalog.invalidateCache() - metadataHive.reset() + + sessionState.metadataHive.reset() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } @@ -464,21 +447,21 @@ class TestHiveContext private[hive]( sessionState.hiveconf.set("fs.default.name", new File(".").toURI.toString) // It is important that we RESET first as broken hooks that might have been set could break // other sql exec here. - executionHive.runSqlHive("RESET") - metadataHive.runSqlHive("RESET") + sessionState.executionHive.runSqlHive("RESET") + sessionState.metadataHive.runSqlHive("RESET") // For some reason, RESET does not reset the following variables... // https://issues.apache.org/jira/browse/HIVE-9004 - runSqlHive("set hive.table.parameters.default=") - runSqlHive("set datanucleus.cache.collections=true") - runSqlHive("set datanucleus.cache.collections.lazy=true") + sessionState.runNativeSql("set hive.table.parameters.default=") + sessionState.runNativeSql("set datanucleus.cache.collections=true") + sessionState.runNativeSql("set datanucleus.cache.collections.lazy=true") // Lots of tests fail if we do not change the partition whitelist from the default. - runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*") + sessionState.runNativeSql("set hive.metastore.partition.name.whitelist.pattern=.*") // In case a test changed any of these values, restore all the original ones here. TestHiveContext.hiveClientConfigurations( sessionState.hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf) - .foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") } - defaultOverrides() + .foreach { case (k, v) => sessionState.metadataHive.runSqlHive(s"SET $k=$v") } + sessionState.setDefaultOverrideConfs() sessionState.catalog.setCurrentDatabase("default") } catch { @@ -489,6 +472,40 @@ class TestHiveContext private[hive]( } + +private[hive] class TestHiveQueryExecution( + sparkSession: TestHiveSparkSession, + logicalPlan: LogicalPlan) + extends HiveQueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging { + + def this(sparkSession: TestHiveSparkSession, sql: String) { + this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) + } + + def this(sql: String) { + this(TestHive.sparkSession, sql) + } + + override lazy val analyzed: LogicalPlan = { + val describedTables = logical match { + case HiveNativeCommand(sparkSession.describedTable(tbl)) => tbl :: Nil + case CacheTableCommand(tbl, _, _) => tbl :: Nil + case _ => Nil + } + + // Make sure any test tables referenced are loaded. + val referencedTables = + describedTables ++ + logical.collect { case UnresolvedRelation(tableIdent, _) => tableIdent.table } + val referencedTestTables = referencedTables.filter(sparkSession.testTables.contains) + logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}") + referencedTestTables.foreach(sparkSession.loadTestTable) + // Proceed with analysis. + sparkSession.sessionState.analyzer.execute(logical) + } +} + + private[hive] class TestHiveFunctionRegistry extends SimpleFunctionRegistry { private val removedFunctions = @@ -517,7 +534,43 @@ private[hive] class TestHiveSharedState( TestHiveContext.newClientForMetadata( sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf) } +} + + +private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) + extends HiveSessionState(new SQLContext(sparkSession)) { + override lazy val conf: SQLConf = { + new SQLConf { + clear() + override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) + override def clear(): Unit = { + super.clear() + TestHiveContext.overrideConfs.map { + case (key, value) => setConfString(key, value) + } + } + } + } + + override lazy val functionRegistry: TestHiveFunctionRegistry = { + // We use TestHiveFunctionRegistry at here to track functions that have been explicitly + // unregistered (through TestHiveFunctionRegistry.unregisterFunction method). + val fr = new TestHiveFunctionRegistry + org.apache.spark.sql.catalyst.analysis.FunctionRegistry.expressions.foreach { + case (name, (info, builder)) => fr.registerFunction(name, info, builder) + } + fr + } + + override def executePlan(plan: LogicalPlan): TestHiveQueryExecution = { + new TestHiveQueryExecution(sparkSession, plan) + } + + // Override so we can intercept relative paths and rewrite them to point at hive. + override def runNativeSql(sql: String): Seq[String] = { + super.runNativeSql(sparkSession.rewritePaths(substitutor.substitute(hiveconf, sql))) + } } @@ -552,7 +605,7 @@ private[hive] object TestHiveContext { /** * Configurations needed to create a [[HiveClient]]. */ - private def hiveClientConfigurations( + def hiveClientConfigurations( hiveconf: HiveConf, warehousePath: File, scratchDirPath: File, @@ -564,7 +617,7 @@ private[hive] object TestHiveContext { ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1") } - private def makeScratchDir(): File = { + def makeScratchDir(): File = { val scratchDir = Utils.createTempDir(namePrefix = "scratch") scratchDir.delete() scratchDir http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index b9e7a36..61910b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -23,7 +23,6 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.hive.execution.HiveSqlParser import org.apache.spark.sql.hive.test.TestHiveSingleton class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterEach { @@ -131,7 +130,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd * @param token a unique token in the string that should be indicated by the exception */ def positionTest(name: String, query: String, token: String): Unit = { - def ast = hiveContext.sessionState.sqlParser.parsePlan(query) + def ast = hiveContext.parseSql(query) def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>") test(name) { http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala index b644a50..b2c0f7e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala @@ -28,9 +28,12 @@ class HiveContextSuite extends SparkFunSuite { val sc = TestHive.sparkContext require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.initialSQLConf.getConfString("spark.sql.hive.metastore.barrierPrefixes") == + assert(TestHive.sparkSession.initialSQLConf.getConfString( + "spark.sql.hive.metastore.barrierPrefixes") == "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") == + // This setting should be also set in the hiveconf of the current session. + assert(TestHive.sessionState.hiveconf.get( + "spark.sql.hive.metastore.barrierPrefixes", "") == "org.apache.spark.sql.hive.execution.PairSerDe") } http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 8648834..2a201c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -96,7 +96,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) - assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) + assert(sessionState.runNativeSql("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } @@ -129,7 +129,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq("decimal(10,3)", "string")) checkAnswer(table("t"), testDF) - assert(runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) + assert(sessionState.runNativeSql("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2")) } } } @@ -159,7 +159,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(columns.map(_.dataType) === Seq("int", "string")) checkAnswer(table("t"), Row(1, "val_1")) - assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) + assert(sessionState.runNativeSql("SELECT * FROM t") === Seq("1\tval_1")) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d0e6870..bbe135b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -253,13 +253,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) // Discard the cached relation. - invalidateTable("jsonTable") + sessionState.invalidateTable("jsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - invalidateTable("jsonTable") + sessionState.invalidateTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) assert(expectedSchema === table("jsonTable").schema) @@ -347,7 +347,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv """.stripMargin) // Discard the cached relation. - invalidateTable("ctasJsonTable") + sessionState.invalidateTable("ctasJsonTable") // Schema should not be changed. assert(table("ctasJsonTable").schema === table("jsonTable").schema) @@ -422,7 +422,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), (6 to 10).map(i => Row(i, s"str$i"))) - invalidateTable("savedJsonTable") + sessionState.invalidateTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), @@ -620,7 +620,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("arrayInParquet") - refreshTable("arrayInParquet") + sessionState.refreshTable("arrayInParquet") checkAnswer( sql("SELECT a FROM arrayInParquet"), @@ -679,7 +679,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .mode(SaveMode.Append) .saveAsTable("mapInParquet") - refreshTable("mapInParquet") + sessionState.refreshTable("mapInParquet") checkAnswer( sql("SELECT a FROM mapInParquet"), @@ -707,7 +707,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempDir.getCanonicalPath), isExternal = false) - invalidateTable("wide_schema") + sessionState.invalidateTable("wide_schema") val actualSchema = table("wide_schema").schema assert(schema === actualSchema) @@ -737,9 +737,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false) + sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false) - invalidateTable(tableName) + sessionState.invalidateTable(tableName) val actualSchema = table(tableName).schema assert(schema === actualSchema) } @@ -751,8 +751,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) - invalidateTable(tableName) - val metastoreTable = hiveCatalog.getTable("default", tableName) + sessionState.invalidateTable(tableName) + val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -786,8 +786,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .bucketBy(8, "d", "b") .sortBy("c") .saveAsTable(tableName) - invalidateTable(tableName) - val metastoreTable = hiveCatalog.getTable("default", tableName) + sessionState.invalidateTable(tableName) + val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -917,7 +917,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. - assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema + assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) sessionState.catalog.createDataSourceTable( @@ -931,9 +931,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in SparkSQL format, // we verify that the table has a column type as array of StringType. - assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c => - HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) - }) + assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata") + .schema.forall { c => HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) }) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 3c00350..850cb1e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1).toDF() private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName) - val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.sharedState.externalCatalog.getTable(dbName, tableName) + val expectedPath = + hiveContext.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } @@ -216,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - hiveContext.refreshTable("t") + hiveContext.sessionState.refreshTable("t") checkAnswer( sqlContext.table("t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) @@ -248,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - hiveContext.refreshTable(s"$db.t") + hiveContext.sessionState.refreshTable(s"$db.t") checkAnswer( sqlContext.table(s"$db.t"), df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2)))) http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index d14c72b..adc7af3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -31,7 +31,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { - val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand) + val parsed = hiveContext.parseSql(analyzeCommand) val operators = parsed.collect { case a: AnalyzeTable => a case o => o @@ -116,7 +116,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - hiveContext.analyze("tempTable") + hiveContext.sessionState.analyze("tempTable") } hiveContext.sessionState.catalog.dropTable( TableIdentifier("tempTable"), ignoreIfNotExists = true) http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala index a3f5921..c58a664 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.hive.execution import java.io.File -import org.apache.spark.sql.hive.test.TestHive._ /** * A set of test cases based on the big-data-benchmark. * https://amplab.cs.berkeley.edu/benchmark/ */ class BigDataBenchmarkSuite extends HiveComparisonTest { - val testDataDirectory = new File("target" + File.separator + "big-data-benchmark-testdata") + import org.apache.spark.sql.hive.test.TestHive.sparkSession._ + val testDataDirectory = new File("target" + File.separator + "big-data-benchmark-testdata") val userVisitPath = new File(testDataDirectory, "uservisits").getCanonicalPath val testTables = Seq( TestTable( http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index e67fcbe..bd46cb9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -30,8 +30,9 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand} import org.apache.spark.sql.execution.datasources.DescribeCommand -import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable, SQLBuilder} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable} +import org.apache.spark.sql.hive.SQLBuilder +import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution} /** * Allows the creations of tests that execute the same query against both hive @@ -141,7 +142,7 @@ abstract class HiveComparisonTest } protected def prepareAnswer( - hiveQuery: TestHive.type#QueryExecution, + hiveQuery: TestHiveQueryExecution, answer: Seq[String]): Seq[String] = { def isSorted(plan: LogicalPlan): Boolean = plan match { @@ -332,7 +333,7 @@ abstract class HiveComparisonTest hiveCachedResults } else { - val hiveQueries = queryList.map(new TestHive.QueryExecution(_)) + val hiveQueries = queryList.map(new TestHiveQueryExecution(_)) // Make sure we can at least parse everything before attempting hive execution. // Note this must only look at the logical plan as we might not be able to analyze if // other DDL has not been executed yet. @@ -352,7 +353,7 @@ abstract class HiveComparisonTest case _: ExplainCommand => // No need to execute EXPLAIN queries as we don't check the output. Nil - case _ => TestHive.runSqlHive(queryString) + case _ => TestHive.sessionState.runNativeSql(queryString) } // We need to add a new line to non-empty answers so we can differentiate Seq() @@ -382,10 +383,10 @@ abstract class HiveComparisonTest // Run w/ catalyst val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => - var query: TestHive.QueryExecution = null + var query: TestHiveQueryExecution = null try { query = { - val originalQuery = new TestHive.QueryExecution(queryString) + val originalQuery = new TestHiveQueryExecution(queryString) val containsCommands = originalQuery.analyzed.collectFirst { case _: Command => () case _: LogicalInsertIntoHiveTable => () @@ -409,7 +410,7 @@ abstract class HiveComparisonTest } try { - val queryExecution = new TestHive.QueryExecution(convertedSQL) + val queryExecution = new TestHiveQueryExecution(convertedSQL) // Trigger the analysis of this converted SQL query. queryExecution.analyzed queryExecution @@ -472,12 +473,12 @@ abstract class HiveComparisonTest // If this query is reading other tables that were created during this test run // also print out the query plans and results for those. val computedTablesMessages: String = try { - val tablesRead = new TestHive.QueryExecution(query).executedPlan.collect { + val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect { case ts: HiveTableScan => ts.relation.tableName }.toSet TestHive.reset() - val executions = queryList.map(new TestHive.QueryExecution(_)) + val executions = queryList.map(new TestHiveQueryExecution(_)) executions.foreach(_.toRdd) val tablesGenerated = queryList.zip(executions).flatMap { // We should take executedPlan instead of sparkPlan, because in following codes we @@ -562,8 +563,8 @@ abstract class HiveComparisonTest // okay by running a simple query. If this fails then we halt testing since // something must have gone seriously wrong. try { - new TestHive.QueryExecution("SELECT key FROM src").stringResult() - TestHive.runSqlHive("SELECT key FROM src") + new TestHiveQueryExecution("SELECT key FROM src").stringResult() + TestHive.sessionState.runNativeSql("SELECT key FROM src") } catch { case e: Exception => logError(s"FATAL ERROR: Canary query threw $e This implies that the " + http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2e7a1d9..93d63f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -49,7 +49,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { override def beforeAll() { super.beforeAll() - TestHive.cacheTables = true + TestHive.setCacheTables(true) // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) // Add Locale setting @@ -58,7 +58,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { override def afterAll() { try { - TestHive.cacheTables = false + TestHive.setCacheTables(false) TimeZone.setDefault(originalTimeZone) Locale.setDefault(originalLocale) sql("DROP TEMPORARY FUNCTION IF EXISTS udtf_count2") @@ -1009,7 +1009,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .mkString("/") // Loads partition data to a temporary table to verify contents - val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000" + val path = s"${sparkSession.warehousePath}/dynamic_part_table/$partFolder/part-00000" sql("DROP TABLE IF EXISTS dp_verify") sql("CREATE TABLE dp_verify(intcol INT)") http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala index 5586a79..b8af0b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala @@ -28,8 +28,8 @@ class HiveSerDeSuite extends HiveComparisonTest with BeforeAndAfterAll { override def beforeAll(): Unit = { import TestHive._ import org.apache.hadoop.hive.serde2.RegexSerDe - super.beforeAll() - TestHive.cacheTables = false + super.beforeAll() + TestHive.setCacheTables(false) sql(s"""CREATE TABLE IF NOT EXISTS sales (key STRING, value INT) |ROW FORMAT SERDE '${classOf[RegexSerDe].getCanonicalName}' |WITH SERDEPROPERTIES ("input.regex" = "([^ ]*)\t([^ ]*)") http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 97cb9d9..79ac53c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -21,18 +21,22 @@ import scala.collection.JavaConverters._ import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.{TestHive, TestHiveQueryExecution} /** * A set of test cases that validate partition and column pruning. */ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { - TestHive.cacheTables = false - // Column/partition pruning is not implemented for `InMemoryColumnarTableScan` yet, need to reset - // the environment to ensure all referenced tables in this suites are not cached in-memory. - // Refer to https://issues.apache.org/jira/browse/SPARK-2283 for details. - TestHive.reset() + override def beforeAll(): Unit = { + super.beforeAll() + TestHive.setCacheTables(false) + // Column/partition pruning is not implemented for `InMemoryColumnarTableScan` yet, + // need to reset the environment to ensure all referenced tables in this suites are + // not cached in-memory. Refer to https://issues.apache.org/jira/browse/SPARK-2283 + // for details. + TestHive.reset() + } // Column pruning tests @@ -144,7 +148,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { expectedScannedColumns: Seq[String], expectedPartValues: Seq[Seq[String]]): Unit = { test(s"$testCaseName - pruning test") { - val plan = new TestHive.QueryExecution(sql).sparkPlan + val plan = new TestHiveQueryExecution(sql).sparkPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { case p @ HiveTableScan(columns, relation, _) => http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1098e74..6b71e59 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - val originalConf = convertCTAS + val originalConf = sessionState.convertCTAS setConf(HiveContext.CONVERT_CTAS, true) @@ -731,7 +731,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // generates an invalid query plan. val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) read.json(rdd).registerTempTable("data") - val originalConf = convertCTAS + val originalConf = sessionState.convertCTAS setConf(HiveContext.CONVERT_CTAS, false) try { http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 8f163f2..00b5c8d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = child, ioschema = noSerdeIOSchema - )(hiveContext), + )(hiveContext.sessionState.hiveconf), rowsDf.collect()) } @@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = child, ioschema = serdeIOSchema - )(hiveContext), + )(hiveContext.sessionState.hiveconf), rowsDf.collect()) } @@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), ioschema = noSerdeIOSchema - )(hiveContext), + )(hiveContext.sessionState.hiveconf), rowsDf.collect()) } assert(e.getMessage().contains("intentional exception")) @@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), ioschema = serdeIOSchema - )(hiveContext), + )(hiveContext.sessionState.hiveconf), rowsDf.collect()) } assert(e.getMessage().contains("intentional exception")) http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 4b2b1a1..6fa4c33 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -461,7 +461,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. - invalidateTable("test_insert_parquet") + sessionState.invalidateTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ @@ -474,7 +474,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. - invalidateTable("test_insert_parquet") + sessionState.invalidateTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. @@ -524,7 +524,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { |select b, '2015-04-02', a FROM jt """.stripMargin).collect()) - invalidateTable("test_parquet_partitioned_cache_test") + sessionState.invalidateTable("test_parquet_partitioned_cache_test") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") http://git-wip-us.apache.org/repos/asf/spark/blob/8fc267ab/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a0be55c..aa6101f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -349,7 +349,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("error if there exists any malformed bucket files") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val tableDir = new File(hiveContext.warehousePath, "bucketed_table") + val tableDir = new File(hiveContext.sparkSession.warehousePath, "bucketed_table") Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org