http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 5dc3a2c..e23ee66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -22,13 +22,12 @@ import java.util.Properties import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class JDBCWriteSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext { +class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val url = "jdbc:h2:mem:testdb2" var conn: java.sql.Connection = null @@ -76,8 +75,6 @@ class JDBCWriteSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLCon conn1.close() } - private lazy val sc = ctx.sparkContext - private lazy val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222)) private lazy val arr1x2 = Array[Row](Row.apply("fred", 3)) private lazy val schema2 = StructType( @@ -91,49 +88,50 @@ class JDBCWriteSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLCon StructField("seq", IntegerType) :: Nil) test("Basic CREATE") { - val df = ctx.createDataFrame(sc.parallelize(arr2x2), schema2) + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties) - assert(2 === ctx.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) - assert(2 === ctx.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) + assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) } test("CREATE with overwrite") { - val df = ctx.createDataFrame(sc.parallelize(arr2x3), schema3) - val df2 = ctx.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x3), schema3) + val df2 = sqlContext.createDataFrame(sparkContext.parallelize(arr1x2), schema2) df.write.jdbc(url1, "TEST.DROPTEST", properties) - assert(2 === ctx.read.jdbc(url1, "TEST.DROPTEST", properties).count) - assert(3 === ctx.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + assert(2 === sqlContext.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(3 === sqlContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties) - assert(1 === ctx.read.jdbc(url1, "TEST.DROPTEST", properties).count) - assert(2 === ctx.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + assert(1 === sqlContext.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(2 === sqlContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) } test("CREATE then INSERT to append") { - val df = ctx.createDataFrame(sc.parallelize(arr2x2), schema2) - val df2 = ctx.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = sqlContext.createDataFrame(sparkContext.parallelize(arr1x2), schema2) df.write.jdbc(url, "TEST.APPENDTEST", new Properties) df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties) - assert(3 === ctx.read.jdbc(url, "TEST.APPENDTEST", new Properties).count) - assert(2 === ctx.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length) + assert(3 === sqlContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).count) + assert(2 === sqlContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length) } test("CREATE then INSERT to truncate") { - val df = ctx.createDataFrame(sc.parallelize(arr2x2), schema2) - val df2 = ctx.createDataFrame(sc.parallelize(arr1x2), schema2) + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = sqlContext.createDataFrame(sparkContext.parallelize(arr1x2), schema2) df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties) - assert(1 === ctx.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count) - assert(2 === ctx.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + assert(1 === sqlContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count) + assert(2 === sqlContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) } test("Incompatible INSERT to append") { - val df = ctx.createDataFrame(sc.parallelize(arr2x2), schema2) - val df2 = ctx.createDataFrame(sc.parallelize(arr2x3), schema3) + val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = sqlContext.createDataFrame(sparkContext.parallelize(arr2x3), schema3) df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties) intercept[org.apache.spark.SparkException] { @@ -143,14 +141,14 @@ class JDBCWriteSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLCon test("INSERT to JDBC Datasource") { sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 === ctx.read.jdbc(url1, "TEST.PEOPLE1", properties).count) - assert(2 === ctx.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } test("INSERT to JDBC Datasource with overwrite") { sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 === ctx.read.jdbc(url1, "TEST.PEOPLE1", properties).count) - assert(2 === ctx.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 === sqlContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } }
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 9bc3f6b..6fc9feb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -26,10 +26,8 @@ import org.apache.spark.sql.execution.datasources.DDLException import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils - class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var path: File = null override def beforeAll(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index d74d29f..af04079 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ - private[sql] abstract class DataSourceTest extends QueryTest { - protected def _sqlContext: SQLContext // We want to test some edge cases. protected lazy val caseInsensitiveContext: SQLContext = { - val ctx = new SQLContext(_sqlContext.sparkContext) + val ctx = new SQLContext(sqlContext.sparkContext) ctx.setConf(SQLConf.CASE_SENSITIVE, false) ctx } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 084d83f..5b70d25 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -19,13 +19,12 @@ package org.apache.spark.sql.sources import java.io.File -import org.apache.spark.sql.{SaveMode, AnalysisException, Row} +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with SharedSQLContext { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var path: File = null override def beforeAll(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 79b6e9b..c979187 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -29,11 +29,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { val path = Utils.createTempDir() path.delete() - val df = ctx.range(100).select($"id", lit(1).as("data")) + val df = sqlContext.range(100).select($"id", lit(1).as("data")) df.write.partitionBy("id").save(path.getCanonicalPath) checkAnswer( - ctx.read.load(path.getCanonicalPath), + sqlContext.read.load(path.getCanonicalPath), (0 to 99).map(Row(1, _)).toSeq) Utils.deleteRecursively(path) @@ -43,12 +43,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { val path = Utils.createTempDir() path.delete() - val base = ctx.range(100) + val base = sqlContext.range(100) val df = base.unionAll(base).select($"id", lit(1).as("data")) df.write.partitionBy("id").save(path.getCanonicalPath) checkAnswer( - ctx.read.load(path.getCanonicalPath), + sqlContext.read.load(path.getCanonicalPath), (0 to 99).map(Row(1, _)).toSeq ++ (0 to 99).map(Row(1, _)).toSeq) Utils.deleteRecursively(path) http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index f18546b..10d2613 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { protected override lazy val sql = caseInsensitiveContext.sql _ - private lazy val sparkContext = caseInsensitiveContext.sparkContext private var originalDefaultSource: String = null private var path: File = null private var df: DataFrame = null http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 3fc02df..520dea7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.{DataFrame, SQLContext, SQLImplicits} * A collection of sample data used in SQL tests. */ private[sql] trait SQLTestData { self => - protected def _sqlContext: SQLContext + protected def sqlContext: SQLContext // Helper object to import SQL implicits without a concrete SQLContext private object internalImplicits extends SQLImplicits { - protected override def _sqlContext: SQLContext = self._sqlContext + protected override def _sqlContext: SQLContext = self.sqlContext } import internalImplicits._ @@ -37,21 +37,21 @@ private[sql] trait SQLTestData { self => // Note: all test data should be lazy because the SQLContext is not set up yet. protected lazy val emptyTestData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( Seq.empty[Int].map(i => TestData(i, i.toString))).toDF() df.registerTempTable("emptyTestData") df } protected lazy val testData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))).toDF() df.registerTempTable("testData") df } protected lazy val testData2: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( TestData2(1, 1) :: TestData2(1, 2) :: TestData2(2, 1) :: @@ -63,7 +63,7 @@ private[sql] trait SQLTestData { self => } protected lazy val testData3: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( TestData3(1, None) :: TestData3(2, Some(2)) :: Nil).toDF() df.registerTempTable("testData3") @@ -71,14 +71,14 @@ private[sql] trait SQLTestData { self => } protected lazy val negativeData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( (1 to 100).map(i => TestData(-i, (-i).toString))).toDF() df.registerTempTable("negativeData") df } protected lazy val largeAndSmallInts: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( LargeAndSmallInts(2147483644, 1) :: LargeAndSmallInts(1, 2) :: LargeAndSmallInts(2147483645, 1) :: @@ -90,7 +90,7 @@ private[sql] trait SQLTestData { self => } protected lazy val decimalData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( DecimalData(1, 1) :: DecimalData(1, 2) :: DecimalData(2, 1) :: @@ -102,7 +102,7 @@ private[sql] trait SQLTestData { self => } protected lazy val binaryData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( BinaryData("12".getBytes, 1) :: BinaryData("22".getBytes, 5) :: BinaryData("122".getBytes, 3) :: @@ -113,7 +113,7 @@ private[sql] trait SQLTestData { self => } protected lazy val upperCaseData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( UpperCaseData(1, "A") :: UpperCaseData(2, "B") :: UpperCaseData(3, "C") :: @@ -125,7 +125,7 @@ private[sql] trait SQLTestData { self => } protected lazy val lowerCaseData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( LowerCaseData(1, "a") :: LowerCaseData(2, "b") :: LowerCaseData(3, "c") :: @@ -135,7 +135,7 @@ private[sql] trait SQLTestData { self => } protected lazy val arrayData: RDD[ArrayData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( ArrayData(Seq(1, 2, 3), Seq(Seq(1, 2, 3))) :: ArrayData(Seq(2, 3, 4), Seq(Seq(2, 3, 4))) :: Nil) rdd.toDF().registerTempTable("arrayData") @@ -143,7 +143,7 @@ private[sql] trait SQLTestData { self => } protected lazy val mapData: RDD[MapData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( MapData(Map(1 -> "a1", 2 -> "b1", 3 -> "c1", 4 -> "d1", 5 -> "e1")) :: MapData(Map(1 -> "a2", 2 -> "b2", 3 -> "c2", 4 -> "d2")) :: MapData(Map(1 -> "a3", 2 -> "b3", 3 -> "c3")) :: @@ -154,13 +154,13 @@ private[sql] trait SQLTestData { self => } protected lazy val repeatedData: RDD[StringData] = { - val rdd = _sqlContext.sparkContext.parallelize(List.fill(2)(StringData("test"))) + val rdd = sqlContext.sparkContext.parallelize(List.fill(2)(StringData("test"))) rdd.toDF().registerTempTable("repeatedData") rdd } protected lazy val nullableRepeatedData: RDD[StringData] = { - val rdd = _sqlContext.sparkContext.parallelize( + val rdd = sqlContext.sparkContext.parallelize( List.fill(2)(StringData(null)) ++ List.fill(2)(StringData("test"))) rdd.toDF().registerTempTable("nullableRepeatedData") @@ -168,7 +168,7 @@ private[sql] trait SQLTestData { self => } protected lazy val nullInts: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullInts(1) :: NullInts(2) :: NullInts(3) :: @@ -178,7 +178,7 @@ private[sql] trait SQLTestData { self => } protected lazy val allNulls: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullInts(null) :: NullInts(null) :: NullInts(null) :: @@ -188,7 +188,7 @@ private[sql] trait SQLTestData { self => } protected lazy val nullStrings: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( NullStrings(1, "abc") :: NullStrings(2, "ABC") :: NullStrings(3, null) :: Nil).toDF() @@ -197,13 +197,13 @@ private[sql] trait SQLTestData { self => } protected lazy val tableName: DataFrame = { - val df = _sqlContext.sparkContext.parallelize(TableName("test") :: Nil).toDF() + val df = sqlContext.sparkContext.parallelize(TableName("test") :: Nil).toDF() df.registerTempTable("tableName") df } protected lazy val unparsedStrings: RDD[String] = { - _sqlContext.sparkContext.parallelize( + sqlContext.sparkContext.parallelize( "1, A1, true, null" :: "2, B2, false, null" :: "3, C3, true, null" :: @@ -212,13 +212,13 @@ private[sql] trait SQLTestData { self => // An RDD with 4 elements and 8 partitions protected lazy val withEmptyParts: RDD[IntField] = { - val rdd = _sqlContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + val rdd = sqlContext.sparkContext.parallelize((1 to 4).map(IntField), 8) rdd.toDF().registerTempTable("withEmptyParts") rdd } protected lazy val person: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( Person(0, "mike", 30) :: Person(1, "jim", 20) :: Nil).toDF() df.registerTempTable("person") @@ -226,7 +226,7 @@ private[sql] trait SQLTestData { self => } protected lazy val salary: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( Salary(0, 2000.0) :: Salary(1, 1000.0) :: Nil).toDF() df.registerTempTable("salary") @@ -234,7 +234,7 @@ private[sql] trait SQLTestData { self => } protected lazy val complexData: DataFrame = { - val df = _sqlContext.sparkContext.parallelize( + val df = sqlContext.sparkContext.parallelize( ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1, 1, 1), true) :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2, 2, 2), false) :: Nil).toDF() @@ -246,7 +246,7 @@ private[sql] trait SQLTestData { self => * Initialize all test data such that all temp tables are properly registered. */ def loadTestData(): Unit = { - assert(_sqlContext != null, "attempted to initialize test data before SQLContext.") + assert(sqlContext != null, "attempted to initialize test data before SQLContext.") emptyTestData testData testData2 http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index dc08306..9214569 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{DataFrame, Row, SQLContext, SQLImplicits} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.util.Utils @@ -47,13 +47,13 @@ private[sql] trait SQLTestUtils with BeforeAndAfterAll with SQLTestData { self => - protected def _sqlContext: SQLContext + protected def sparkContext = sqlContext.sparkContext // Whether to materialize all test data before the first test is run private var loadTestDataBeforeTests = false // Shorthand for running a query using our SQLContext - protected lazy val sql = _sqlContext.sql _ + protected lazy val sql = sqlContext.sql _ /** * A helper object for importing SQL implicits. @@ -63,7 +63,14 @@ private[sql] trait SQLTestUtils * but the implicits import is needed in the constructor. */ protected object testImplicits extends SQLImplicits { - protected override def _sqlContext: SQLContext = self._sqlContext + protected override def _sqlContext: SQLContext = self.sqlContext + + // This must live here to preserve binary compatibility with Spark < 1.5. + implicit class StringToColumn(val sc: StringContext) { + def $(args: Any*): ColumnName = { + new ColumnName(sc.s(args: _*)) + } + } } /** @@ -84,8 +91,8 @@ private[sql] trait SQLTestUtils /** * The Hadoop configuration used by the active [[SQLContext]]. */ - protected def configuration: Configuration = { - _sqlContext.sparkContext.hadoopConfiguration + protected def hadoopConfiguration: Configuration = { + sparkContext.hadoopConfiguration } /** @@ -96,12 +103,12 @@ private[sql] trait SQLTestUtils */ protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { val (keys, values) = pairs.unzip - val currentValues = keys.map(key => Try(_sqlContext.conf.getConfString(key)).toOption) - (keys, values).zipped.foreach(_sqlContext.conf.setConfString) + val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption) + (keys, values).zipped.foreach(sqlContext.conf.setConfString) try f finally { keys.zip(currentValues).foreach { - case (key, Some(value)) => _sqlContext.conf.setConfString(key, value) - case (key, None) => _sqlContext.conf.unsetConf(key) + case (key, Some(value)) => sqlContext.conf.setConfString(key, value) + case (key, None) => sqlContext.conf.unsetConf(key) } } } @@ -133,7 +140,7 @@ private[sql] trait SQLTestUtils * Drops temporary table `tableName` after calling `f`. */ protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { - try f finally tableNames.foreach(_sqlContext.dropTempTable) + try f finally tableNames.foreach(sqlContext.dropTempTable) } /** @@ -142,7 +149,7 @@ private[sql] trait SQLTestUtils protected def withTable(tableNames: String*)(f: => Unit): Unit = { try f finally { tableNames.foreach { name => - _sqlContext.sql(s"DROP TABLE IF EXISTS $name") + sqlContext.sql(s"DROP TABLE IF EXISTS $name") } } } @@ -155,12 +162,12 @@ private[sql] trait SQLTestUtils val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}" try { - _sqlContext.sql(s"CREATE DATABASE $dbName") + sqlContext.sql(s"CREATE DATABASE $dbName") } catch { case cause: Throwable => fail("Failed to create temporary database", cause) } - try f(dbName) finally _sqlContext.sql(s"DROP DATABASE $dbName CASCADE") + try f(dbName) finally sqlContext.sql(s"DROP DATABASE $dbName CASCADE") } /** @@ -168,8 +175,8 @@ private[sql] trait SQLTestUtils * `f` returns. */ protected def activateDatabase(db: String)(f: => Unit): Unit = { - _sqlContext.sql(s"USE $db") - try f finally _sqlContext.sql(s"USE default") + sqlContext.sql(s"USE $db") + try f finally sqlContext.sql(s"USE default") } /** @@ -177,7 +184,7 @@ private[sql] trait SQLTestUtils * way to construct [[DataFrame]] directly out of local data without relying on implicits. */ protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = { - DataFrame(_sqlContext, plan) + DataFrame(sqlContext, plan) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index d23c6a0..963d10e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.test -import org.apache.spark.sql.{ColumnName, SQLContext} +import org.apache.spark.sql.SQLContext /** @@ -36,9 +36,7 @@ trait SharedSQLContext extends SQLTestUtils { /** * The [[TestSQLContext]] to use for all tests in this suite. */ - protected def ctx: TestSQLContext = _ctx - protected def sqlContext: TestSQLContext = _ctx - protected override def _sqlContext: SQLContext = _ctx + protected def sqlContext: SQLContext = _ctx /** * Initialize the [[TestSQLContext]]. @@ -64,15 +62,4 @@ trait SharedSQLContext extends SQLTestUtils { super.afterAll() } } - - /** - * Converts $"col name" into an [[Column]]. - * @since 1.3.0 - */ - // This must be duplicated here to preserve binary compatibility with Spark < 1.5. - implicit class StringToColumn(val sc: StringContext) { - def $(args: Any*): ColumnName = { - new ColumnName(sc.s(args: _*)) - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index 92ef2f7..d99d191 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -47,6 +47,6 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel } private object testData extends SQLTestData { - protected override def _sqlContext: SQLContext = self + protected override def sqlContext: SQLContext = self } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 57fea5d..77f43f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe -import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.{SQLContext, SQLConf} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.CacheTableCommand @@ -51,6 +51,11 @@ object TestHive // SPARK-8910 .set("spark.ui.enabled", "false"))) +trait TestHiveSingleton { + protected val sqlContext: SQLContext = TestHive + protected val hiveContext: TestHiveContext = TestHive +} + /** * A locally running test instance of Spark's Hive execution engine. * http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 39d315a..9adb378 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils -class CachedTableSuite extends QueryTest { +class CachedTableSuite extends QueryTest with TestHiveSingleton { + import hiveContext._ def rddIdOf(tableName: String): Int = { val executedPlan = table(tableName).queryExecution.executedPlan @@ -95,18 +95,18 @@ class CachedTableSuite extends QueryTest { test("correct error on uncache of non-cached table") { intercept[IllegalArgumentException] { - TestHive.uncacheTable("src") + hiveContext.uncacheTable("src") } } test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") { - TestHive.sql("CACHE TABLE src") + sql("CACHE TABLE src") assertCached(table("src")) - assert(TestHive.isCached("src"), "Table 'src' should be cached") + assert(hiveContext.isCached("src"), "Table 'src' should be cached") - TestHive.sql("UNCACHE TABLE src") + sql("UNCACHE TABLE src") assertCached(table("src"), 0) - assert(!TestHive.isCached("src"), "Table 'src' should not be cached") + assert(!hiveContext.isCached("src"), "Table 'src' should not be cached") } test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index 30f5313..cf73783 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -22,12 +22,12 @@ import scala.util.Try import org.scalatest.BeforeAndAfter import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{AnalysisException, QueryTest} -class ErrorPositionSuite extends QueryTest with BeforeAndAfter { +class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter { + import hiveContext.implicits._ before { Seq((1, 1, 1)).toDF("a", "a", "b").registerTempTable("dupAttributes") @@ -122,7 +122,7 @@ class ErrorPositionSuite extends QueryTest with BeforeAndAfter { test(name) { val error = intercept[AnalysisException] { - quietly(sql(query)) + quietly(hiveContext.sql(query)) } assert(!error.getMessage.contains("Seq(")) http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala index fb10f85..2e5cae4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala @@ -19,24 +19,25 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.scalatest.BeforeAndAfterAll // TODO ideally we should put the test suite into the package `sql`, as // `hive` package is optional in compiling, however, `SQLContext.sql` doesn't // support the `cube` or `rollup` yet. -class HiveDataFrameAnalyticsSuite extends QueryTest with BeforeAndAfterAll { +class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { + import hiveContext.implicits._ + import hiveContext.sql + private var testData: DataFrame = _ override def beforeAll() { testData = Seq((1, 2), (2, 4)).toDF("a", "b") - TestHive.registerDataFrameAsTable(testData, "mytable") + hiveContext.registerDataFrameAsTable(testData, "mytable") } override def afterAll(): Unit = { - TestHive.dropTempTable("mytable") + hiveContext.dropTempTable("mytable") } test("rollup") { http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala index 52e7827..f621367 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{Row, QueryTest} -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton - -class HiveDataFrameJoinSuite extends QueryTest { +class HiveDataFrameJoinSuite extends QueryTest with TestHiveSingleton { + import hiveContext.implicits._ // We should move this into SQL package if we make case sensitivity configurable in SQL. test("join - self join auto resolve ambiguity with case insensitivity") { http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala index c177cbd..2c98f1c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{Row, QueryTest} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton -class HiveDataFrameWindowSuite extends QueryTest { +class HiveDataFrameWindowSuite extends QueryTest with TestHiveSingleton { + import hiveContext.implicits._ + import hiveContext.sql test("reuse window partitionBy") { val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value") http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 574624d..107457f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -19,18 +19,15 @@ package org.apache.spark.sql.hive import java.io.File +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.sources.DataSourceTest +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} -import org.apache.spark.sql.{Row, SaveMode, SQLContext} -import org.apache.spark.{Logging, SparkFunSuite} - -class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { +class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton { + import hiveContext.implicits._ test("struct field should accept underscore in sub-column name") { val hiveTypeStr = "struct<a: int, b_1: string, c: string>" @@ -46,14 +43,15 @@ class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { } test("duplicated metastore relations") { - val df = sql("SELECT * FROM src") + val df = hiveContext.sql("SELECT * FROM src") logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } } -class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive +class DataSourceWithHiveMetastoreCatalogSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext._ import testImplicits._ private val testDF = range(1, 3).select( http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index fe0db52..5596ec6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -17,15 +17,13 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.execution.datasources.parquet.ParquetTest -import org.apache.spark.sql.{QueryTest, Row, SQLContext} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.{QueryTest, Row} case class Cases(lower: String, UPPER: String) -class HiveParquetSuite extends QueryTest with ParquetTest { - private val ctx = TestHive - override def _sqlContext: SQLContext = ctx +class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton { test("Case insensitive attribute names") { withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), "cases") { @@ -53,7 +51,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest { test("Converting Hive to Parquet Table via saveAsParquetFile") { withTempPath { dir => sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath) - ctx.read.parquet(dir.getCanonicalPath).registerTempTable("p") + hiveContext.read.parquet(dir.getCanonicalPath).registerTempTable("p") withTempTable("p") { checkAnswer( sql("SELECT * FROM src ORDER BY key"), @@ -66,7 +64,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest { withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") { withTempPath { file => sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath) - ctx.read.parquet(file.getCanonicalPath).registerTempTable("p") + hiveContext.read.parquet(file.getCanonicalPath).registerTempTable("p") withTempTable("p") { // let's do three overwrites for good measure sql("INSERT OVERWRITE TABLE p SELECT * FROM t") http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index dc2d85f..84f3db4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLContext, QueryTest} import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.DecimalType @@ -272,7 +272,11 @@ object SparkSQLConfTest extends Logging { } } -object SPARK_9757 extends QueryTest with Logging { +object SPARK_9757 extends QueryTest { + import org.apache.spark.sql.functions._ + + protected var sqlContext: SQLContext = _ + def main(args: Array[String]): Unit = { Utils.configTestLog4j("INFO") @@ -282,10 +286,9 @@ object SPARK_9757 extends QueryTest with Logging { .set("spark.sql.hive.metastore.jars", "maven")) val hiveContext = new TestHiveContext(sparkContext) + sqlContext = hiveContext import hiveContext.implicits._ - import org.apache.spark.sql.functions._ - val dir = Utils.createTempDir() dir.delete() http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d33e812..80a61f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -24,28 +24,25 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.{QueryTest, _} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -/* Implicits */ -import org.apache.spark.sql.hive.test.TestHive._ - case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) -class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { - import org.apache.spark.sql.hive.test.TestHive.implicits._ - +class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter { + import hiveContext.implicits._ + import hiveContext.sql - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))).toDF() before { // Since every we are doing tests for DDL statements, // it is better to reset before every test. - TestHive.reset() + hiveContext.reset() // Register the testData, which will be used in every test. testData.registerTempTable("testData") } @@ -96,9 +93,9 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { test("SPARK-4052: scala.collection.Map as value type of MapType") { val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil) - val rowRDD = TestHive.sparkContext.parallelize( + val rowRDD = hiveContext.sparkContext.parallelize( (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i")))) - val df = TestHive.createDataFrame(rowRDD, schema) + val df = hiveContext.createDataFrame(rowRDD, schema) df.registerTempTable("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -169,8 +166,8 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { test("Insert ArrayType.containsNull == false") { val schema = StructType(Seq( StructField("a", ArrayType(StringType, containsNull = false)))) - val rowRDD = TestHive.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) - val df = TestHive.createDataFrame(rowRDD, schema) + val rowRDD = hiveContext.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) + val df = hiveContext.createDataFrame(rowRDD, schema) df.registerTempTable("tableWithArrayValue") sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)") sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue") @@ -185,9 +182,9 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { test("Insert MapType.valueContainsNull == false") { val schema = StructType(Seq( StructField("m", MapType(StringType, StringType, valueContainsNull = false)))) - val rowRDD = TestHive.sparkContext.parallelize( + val rowRDD = hiveContext.sparkContext.parallelize( (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i")))) - val df = TestHive.createDataFrame(rowRDD, schema) + val df = hiveContext.createDataFrame(rowRDD, schema) df.registerTempTable("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -202,9 +199,9 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { test("Insert StructType.fields.exists(_.nullable == false)") { val schema = StructType(Seq( StructField("s", StructType(Seq(StructField("f", StringType, nullable = false)))))) - val rowRDD = TestHive.sparkContext.parallelize( + val rowRDD = hiveContext.sparkContext.parallelize( (1 to 100).map(i => Row(Row(s"value$i")))) - val df = TestHive.createDataFrame(rowRDD, schema) + val df = hiveContext.createDataFrame(rowRDD, schema) df.registerTempTable("tableWithStructValue") sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)") sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue") @@ -217,11 +214,11 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter { } test("SPARK-5498:partition schema does not match table schema") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( (1 to 10).map(i => TestData(i, i.toString))).toDF() testData.registerTempTable("testData") - val testDatawithNull = TestHive.sparkContext.parallelize( + val testDatawithNull = hiveContext.sparkContext.parallelize( (1 to 10).map(i => ThreeCloumntable(i, i.toString, null))).toDF() val tmpDir = Utils.createTempDir() http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index d3388a9..579631d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -19,17 +19,15 @@ package org.apache.spark.sql.hive import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.QueryTest import org.apache.spark.sql.Row -class ListTablesSuite extends QueryTest with BeforeAndAfterAll { +class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { + import hiveContext._ + import hiveContext.implicits._ - import org.apache.spark.sql.hive.test.TestHive.implicits._ - - val df = - sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value") + val df = sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value") override def beforeAll(): Unit = { // The catalog in HiveContext is a case insensitive one. http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 20a5058..bf0db08 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -22,15 +22,11 @@ import java.io.{IOException, File} import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterAll -import org.apache.spark.Logging import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -39,10 +35,9 @@ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll - with Logging { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext._ + import hiveContext.implicits._ var jsonFilePath: String = _ http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 997c667..f16c257 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -17,20 +17,16 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{AnalysisException, QueryTest, SQLContext, SaveMode} +import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} -class MultiDatabaseSuite extends QueryTest with SQLTestUtils { - override val _sqlContext: HiveContext = TestHive - private val sqlContext = _sqlContext - - private val df = sqlContext.range(10).coalesce(1) +class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + private lazy val df = sqlContext.range(10).coalesce(1) private def checkTablePath(dbName: String, tableName: String): Unit = { - // val hiveContext = sqlContext.asInstanceOf[HiveContext] - val metastoreTable = sqlContext.catalog.client.getTable(dbName, tableName) - val expectedPath = sqlContext.catalog.client.getDatabase(dbName).location + "/" + tableName + val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName) + val expectedPath = hiveContext.catalog.client.getDatabase(dbName).location + "/" + tableName assert(metastoreTable.serdeProperties("path") === expectedPath) } @@ -220,7 +216,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils { df.write.parquet(s"$path/p=2") sql("ALTER TABLE t ADD PARTITION (p=2)") - sqlContext.refreshTable("t") + hiveContext.refreshTable("t") checkAnswer( sqlContext.table("t"), df.withColumn("p", lit(1)).unionAll(df.withColumn("p", lit(2)))) @@ -252,7 +248,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils { df.write.parquet(s"$path/p=2") sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)") - sqlContext.refreshTable(s"$db.t") + hiveContext.refreshTable(s"$db.t") checkAnswer( sqlContext.table(s"$db.t"), df.withColumn("p", lit(1)).unionAll(df.withColumn("p", lit(2)))) http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 91d7a48..49aab85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -18,38 +18,20 @@ package org.apache.spark.sql.hive import java.sql.Timestamp -import java.util.{Locale, TimeZone} import org.apache.hadoop.hive.conf.HiveConf -import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.{Row, SQLConf, SQLContext} - -class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with BeforeAndAfterAll { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +import org.apache.spark.sql.{Row, SQLConf} +import org.apache.spark.sql.hive.test.TestHiveSingleton +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** * Set the staging directory (and hence path to ignore Parquet files under) * to that set by [[HiveConf.ConfVars.STAGINGDIR]]. */ private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) - private val originalTimeZone = TimeZone.getDefault - private val originalLocale = Locale.getDefault - - protected override def beforeAll(): Unit = { - TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) - Locale.setDefault(Locale.US) - } - - override protected def afterAll(): Unit = { - TimeZone.setDefault(originalTimeZone) - Locale.setDefault(originalLocale) - } - override protected def logParquetSchema(path: String): Unit = { val schema = readParquetSchema(path, { path => !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir) http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 1cc8a93..f542a5a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -18,22 +18,18 @@ package org.apache.spark.sql.hive import com.google.common.io.Files -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{QueryTest, _} import org.apache.spark.util.Utils +import org.apache.spark.sql.{QueryTest, _} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext.implicits._ -class QueryPartitionSuite extends QueryTest with SQLTestUtils { - - private lazy val ctx = org.apache.spark.sql.hive.test.TestHive - import ctx.implicits._ - - protected def _sqlContext = ctx - - test("SPARK-5068: query data when path doesn't exist"){ + test("SPARK-5068: query data when path doesn't exist") { withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { - val testData = ctx.sparkContext.parallelize( + val testData = sparkContext.parallelize( (1 to 10).map(i => TestData(i, i.toString))).toDF() testData.registerTempTable("testData") http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e4fec7e..6a692d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -17,24 +17,15 @@ package org.apache.spark.sql.hive -import org.scalatest.BeforeAndAfterAll - import scala.reflect.ClassTag import org.apache.spark.sql.{Row, SQLConf, QueryTest} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.hive.test.TestHiveSingleton -class StatisticsSuite extends QueryTest with BeforeAndAfterAll { - - private lazy val ctx: HiveContext = { - val ctx = org.apache.spark.sql.hive.test.TestHive - ctx.reset() - ctx.cacheTables = false - ctx - } - - import ctx.sql +class StatisticsSuite extends QueryTest with TestHiveSingleton { + import hiveContext.sql test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { @@ -54,9 +45,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } } - // Ensure session state is initialized. - ctx.parseSql("use default") - assertAnalyzeCommand( "ANALYZE TABLE Table1 COMPUTE STATISTICS", classOf[HiveNativeCommand]) @@ -80,7 +68,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - ctx.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes + hiveContext.catalog.lookupRelation(Seq(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -114,7 +102,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === ctx.conf.defaultSizeInBytes) + assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") @@ -125,9 +113,9 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Try to analyze a temp table sql("""SELECT * FROM src""").registerTempTable("tempTable") intercept[UnsupportedOperationException] { - ctx.analyze("tempTable") + hiveContext.analyze("tempTable") } - ctx.catalog.unregisterTable(Seq("tempTable")) + hiveContext.catalog.unregisterTable(Seq("tempTable")) } test("estimates the size of a test MetastoreRelation") { @@ -155,8 +143,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { val sizes = df.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= ctx.conf.autoBroadcastJoinThreshold - && sizes(1) <= ctx.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold + && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -167,8 +155,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(df, expectedAnswer) // check correctness of output - ctx.conf.settings.synchronized { - val tmp = ctx.conf.autoBroadcastJoinThreshold + hiveContext.conf.settings.synchronized { + val tmp = hiveContext.conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""") df = sql(query) @@ -211,8 +199,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(1) <= ctx.conf.autoBroadcastJoinThreshold - && sizes(0) <= ctx.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold + && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -225,8 +213,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(df, answer) // check correctness of output - ctx.conf.settings.synchronized { - val tmp = ctx.conf.autoBroadcastJoinThreshold + hiveContext.conf.settings.synchronized { + val tmp = hiveContext.conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") df = sql(leftSemiJoinQuery) http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala index 7ee1c8d..3ab4576 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala @@ -18,18 +18,18 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton case class FunctionResult(f1: String, f2: String) -class UDFSuite extends QueryTest { - private lazy val ctx = org.apache.spark.sql.hive.test.TestHive +class UDFSuite extends QueryTest with TestHiveSingleton { test("UDF case insensitive") { - ctx.udf.register("random0", () => { Math.random() }) - ctx.udf.register("RANDOM1", () => { Math.random() }) - ctx.udf.register("strlenScala", (_: String).length + (_: Int)) - assert(ctx.sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0) - assert(ctx.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0) - assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) + hiveContext.udf.register("random0", () => { Math.random() }) + hiveContext.udf.register("RANDOM1", () => { Math.random() }) + hiveContext.udf.register("strlenScala", (_: String).length + (_: Int)) + assert(hiveContext.sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0) + assert(hiveContext.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0) + assert(hiveContext.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 4886a85..b126ec4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -17,19 +17,15 @@ package org.apache.spark.sql.hive.execution -import org.scalatest.BeforeAndAfterAll - import org.apache.spark.sql._ import org.apache.spark.sql.execution.aggregate -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.hive.aggregate.{MyDoubleAvg, MyDoubleSum} +import org.apache.spark.sql.hive.test.TestHiveSingleton -abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll { - override def _sqlContext: SQLContext = TestHive - protected val sqlContext = _sqlContext - import sqlContext.implicits._ +abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ var originalUseAggregate2: Boolean = _ @@ -69,7 +65,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Be data2.write.saveAsTable("agg2") val emptyDF = sqlContext.createDataFrame( - sqlContext.sparkContext.emptyRDD[Row], + sparkContext.emptyRDD[Row], StructType(StructField("key", StringType) :: StructField("value", IntegerType) :: Nil)) emptyDF.registerTempTable("emptyTable") @@ -597,7 +593,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue sqlContext.conf.unsetConf("spark.sql.TungstenAggregate.testFallbackStartsAt") } - override protected def checkAnswer(actual: DataFrame, expectedAnswer: Seq[Row]): Unit = { + override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { (0 to 2).foreach { fallbackStartsAt => sqlContext.setConf( "spark.sql.TungstenAggregate.testFallbackStartsAt", @@ -605,6 +601,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. + // todo: remove it? val newActual = DataFrame(sqlContext, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { @@ -626,12 +623,12 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue } // Override it to make sure we call the actually overridden checkAnswer. - override protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = { + override protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = { checkAnswer(df, Seq(expectedAnswer)) } // Override it to make sure we call the actually overridden checkAnswer. - override protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = { + override protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = { checkAnswer(df, expectedAnswer.collect()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 4d45249..aa95ba9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -23,7 +23,7 @@ import scala.util.control.NonFatal import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} -import org.apache.spark.{Logging, SparkFunSuite} +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ @@ -42,7 +42,7 @@ import org.apache.spark.sql.hive.test.TestHive * configured using system properties. */ abstract class HiveComparisonTest - extends SparkFunSuite with BeforeAndAfterAll with GivenWhenThen with Logging { + extends SparkFunSuite with BeforeAndAfterAll with GivenWhenThen { /** * When set, any cache files that result in test failures will be deleted. Used when the test http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 11d7a87..94162da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -17,17 +17,14 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.{SQLContext, QueryTest} -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton /** * A set of tests that validates support for Hive Explain command. */ -class HiveExplainSuite extends QueryTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("explain extended command") { checkExistence(sql(" explain select * from src where key=123 "), true, @@ -83,7 +80,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils { test("SPARK-6212: The EXPLAIN output of CTAS only shows the analyzed plan") { withTempTable("jt") { val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - read.json(rdd).registerTempTable("jt") + hiveContext.read.json(rdd).registerTempTable("jt") val outputs = sql( s""" |EXPLAIN EXTENDED http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala index efbef68..0d4c7f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala @@ -18,14 +18,16 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{Row, QueryTest} -import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} /** * A set of tests that validates commands can also be queried by like a table */ -class HiveOperatorQueryableSuite extends QueryTest { +class HiveOperatorQueryableSuite extends QueryTest with TestHiveSingleton { + import hiveContext._ + test("SPARK-5324 query result of describe command") { - loadTestTable("src") + hiveContext.loadTestTable("src") // register a describe command to be a temp table sql("desc src").registerTempTable("mydesc") http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala index ba56a8a..cd055f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -21,11 +21,11 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHiveSingleton -class HivePlanTest extends QueryTest { - import TestHive._ - import TestHive.implicits._ +class HivePlanTest extends QueryTest with TestHiveSingleton { + import hiveContext.sql + import hiveContext.implicits._ test("udf constant folding") { Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t") http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 9c10ffe..d9ba895 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} -import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.util.Utils @@ -43,10 +43,10 @@ case class ListStringCaseClass(l: Seq[String]) /** * A test suite for Hive custom UDFs. */ -class HiveUDFSuite extends QueryTest { +class HiveUDFSuite extends QueryTest with TestHiveSingleton { - import TestHive.{udf, sql} - import TestHive.implicits._ + import hiveContext.{udf, sql} + import hiveContext.implicits._ test("spark sql udf test that returns a struct") { udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) @@ -123,12 +123,12 @@ class HiveUDFSuite extends QueryTest { | "value", value)).value FROM src """.stripMargin), Seq(Row("val_0"))) } - val codegenDefault = TestHive.getConf(SQLConf.CODEGEN_ENABLED) - TestHive.setConf(SQLConf.CODEGEN_ENABLED, true) + val codegenDefault = hiveContext.getConf(SQLConf.CODEGEN_ENABLED) + hiveContext.setConf(SQLConf.CODEGEN_ENABLED, true) testOrderInStruct() - TestHive.setConf(SQLConf.CODEGEN_ENABLED, false) + hiveContext.setConf(SQLConf.CODEGEN_ENABLED, false) testOrderInStruct() - TestHive.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault) + hiveContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault) } test("SPARK-6409 UDAFAverage test") { @@ -137,7 +137,7 @@ class HiveUDFSuite extends QueryTest { sql("SELECT test_avg(1), test_avg(substr(value,5)) FROM src"), Seq(Row(1.0, 260.182))) sql("DROP TEMPORARY FUNCTION IF EXISTS test_avg") - TestHive.reset() + hiveContext.reset() } test("SPARK-2693 udaf aggregates test") { @@ -157,7 +157,7 @@ class HiveUDFSuite extends QueryTest { } test("UDFIntegerToString") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() testData.registerTempTable("integerTable") @@ -168,11 +168,11 @@ class HiveUDFSuite extends QueryTest { Seq(Row("1"), Row("2"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFIntegerToString") - TestHive.reset() + hiveContext.reset() } test("UDFToListString") { - val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") @@ -183,11 +183,11 @@ class HiveUDFSuite extends QueryTest { "JVM type erasure makes spark fail to catch a component type in List<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListString") - TestHive.reset() + hiveContext.reset() } test("UDFToListInt") { - val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") @@ -198,11 +198,11 @@ class HiveUDFSuite extends QueryTest { "JVM type erasure makes spark fail to catch a component type in List<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToListInt") - TestHive.reset() + hiveContext.reset() } test("UDFToStringIntMap") { - val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " + @@ -214,11 +214,11 @@ class HiveUDFSuite extends QueryTest { "JVM type erasure makes spark fail to catch key and value types in Map<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToStringIntMap") - TestHive.reset() + hiveContext.reset() } test("UDFToIntIntMap") { - val testData = TestHive.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.registerTempTable("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " + @@ -230,11 +230,11 @@ class HiveUDFSuite extends QueryTest { "JVM type erasure makes spark fail to catch key and value types in Map<>;") sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToIntIntMap") - TestHive.reset() + hiveContext.reset() } test("UDFListListInt") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() @@ -246,11 +246,11 @@ class HiveUDFSuite extends QueryTest { Seq(Row(0), Row(2), Row(13))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListListInt") - TestHive.reset() + hiveContext.reset() } test("UDFListString") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( ListStringCaseClass(Seq("a", "b", "c")) :: ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() testData.registerTempTable("listStringTable") @@ -261,11 +261,11 @@ class HiveUDFSuite extends QueryTest { Seq(Row("a,b,c"), Row("d,e"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFListString") - TestHive.reset() + hiveContext.reset() } test("UDFStringString") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() testData.registerTempTable("stringTable") @@ -280,11 +280,11 @@ class HiveUDFSuite extends QueryTest { sql("DROP TEMPORARY FUNCTION IF EXISTS testStringStringUDF") - TestHive.reset() + hiveContext.reset() } test("UDFTwoListList") { - val testData = TestHive.sparkContext.parallelize( + val testData = hiveContext.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: @@ -297,7 +297,7 @@ class HiveUDFSuite extends QueryTest { Seq(Row("0, 0"), Row("2, 2"), Row("13, 13"))) sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") - TestHive.reset() + hiveContext.reset() } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1ff1d9a..8126d02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -26,9 +26,7 @@ import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, EliminateSubQueries} import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils @@ -65,12 +63,12 @@ class MyDialect extends DefaultParserDialect * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is * valid, but Hive currently cannot execute it. */ -class SQLQuerySuite extends QueryTest with SQLTestUtils { - override def _sqlContext: SQLContext = TestHive - private val sqlContext = _sqlContext +class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + import hiveContext._ + import hiveContext.implicits._ test("UDTF") { - sql(s"ADD JAR ${TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath()}") + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") // The function source code can be found at: // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF sql( @@ -509,19 +507,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { checkAnswer( sql("SELECT f1.f2.f3 FROM nested"), Row(1)) - checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"), - Seq.empty[Row]) + + sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested") checkAnswer( sql("SELECT * FROM test_ctas_1234"), sql("SELECT * FROM nested").collect().toSeq) intercept[AnalysisException] { - sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect() + sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect() } } test("test CTAS") { - checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row]) + sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src") checkAnswer( sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) @@ -614,7 +612,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { val rowRdd = sparkContext.parallelize(row :: Nil) - TestHive.createDataFrame(rowRdd, schema).registerTempTable("testTable") + hiveContext.createDataFrame(rowRdd, schema).registerTempTable("testTable") sql( """CREATE TABLE nullValuesInInnerComplexTypes @@ -1044,10 +1042,10 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { val thread = new Thread { override def run() { // To make sure this test works, this jar should not be loaded in another place. - TestHive.sql( - s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}") + sql( + s"ADD JAR ${hiveContext.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}") try { - TestHive.sql( + sql( """ |CREATE TEMPORARY FUNCTION example_max |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax' @@ -1097,21 +1095,21 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { test("SPARK-8588 HiveTypeCoercion.inConversion fires too early") { val df = - TestHive.createDataFrame(Seq((1, "2014-01-01"), (2, "2015-01-01"), (3, "2016-01-01"))) + createDataFrame(Seq((1, "2014-01-01"), (2, "2015-01-01"), (3, "2016-01-01"))) df.toDF("id", "datef").registerTempTable("test_SPARK8588") checkAnswer( - TestHive.sql( + sql( """ |select id, concat(year(datef)) |from test_SPARK8588 where concat(year(datef), ' year') in ('2015 year', '2014 year') """.stripMargin), Row(1, "2014") :: Row(2, "2015") :: Nil ) - TestHive.dropTempTable("test_SPARK8588") + dropTempTable("test_SPARK8588") } test("SPARK-9371: fix the support for special chars in column names for hive context") { - TestHive.read.json(TestHive.sparkContext.makeRDD( + read.json(sparkContext.makeRDD( """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil)) .registerTempTable("t") @@ -1142,8 +1140,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { test("specifying database name for a temporary table is not allowed") { withTempPath { dir => val path = dir.getCanonicalPath - val df = - sqlContext.sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") + val df = sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str") df .write .format("parquet") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
