Repository: spark Updated Branches: refs/heads/master fa3c06987 -> 18c2c9258
http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2bea32b..7d1f87f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -33,7 +33,7 @@ private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => - val context: SQLContext + val sparkSession: SparkSession val hiveconf: HiveConf object Scripts extends Strategy { @@ -78,7 +78,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil + HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index cd45706..0520e75 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} @@ -42,7 +42,7 @@ private[hive] case class MetastoreRelation( alias: Option[String]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, - @transient private val sqlContext: SQLContext) + @transient private val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { @@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation( Objects.hashCode(databaseName, tableName, alias, output) } - override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil + override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil private def toHiveColumn(c: CatalogColumn): FieldSchema = { new FieldSchema(c.name, c.dataType, c.comment.orNull) @@ -124,7 +124,7 @@ private[hive] case class MetastoreRelation( // if the size is still less than zero, we use default size Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) + .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) } ) @@ -133,7 +133,7 @@ private[hive] case class MetastoreRelation( private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { + val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { client.getPartitionsByFilter(catalogTable, predicates) } else { allPartitions @@ -226,6 +226,6 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext) + MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sparkSession) } } http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index e95069e..af0317f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -61,7 +61,7 @@ private[hive] class HadoopTableReader( @transient private val attributes: Seq[Attribute], @transient private val relation: MetastoreRelation, - @transient private val sc: SQLContext, + @transient private val sparkSession: SparkSession, hiveconf: HiveConf) extends TableReader with Logging { @@ -69,15 +69,15 @@ class HadoopTableReader( // https://hadoop.apache.org/docs/r1.0.4/mapred-default.html // // In order keep consistency with Hive, we will let it be 0 in local mode also. - private val _minSplitsPerRDD = if (sc.sparkContext.isLocal) { + private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 0 // will splitted based on block by default. } else { - math.max(hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) + math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions) } - SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveconf) + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf) private val _broadcastedHiveConf = - sc.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( @@ -153,7 +153,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sc.conf.verifyPartitionPath) { + if (!sparkSession.sessionState.conf.verifyPartitionPath) { partitionToDeserializer } else { var existPathSet = collection.mutable.Set[String]() @@ -246,7 +246,7 @@ class HadoopTableReader( // Even if we don't use any partitions, we still need an empty RDD if (hivePartitionRDDs.size == 0) { - new EmptyRDD[InternalRow](sc.sparkContext) + new EmptyRDD[InternalRow](sparkSession.sparkContext) } else { new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) } @@ -278,7 +278,7 @@ class HadoopTableReader( val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( - sc.sparkContext, + sparkSession.sparkContext, _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), inputFormatClass, http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 9240f9c..08d4b99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand @@ -42,7 +42,7 @@ case class CreateTableAsSelect( override def children: Seq[LogicalPlan] = Seq(query) - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sparkSession: SparkSession): Seq[Row] = { lazy val metastoreRelation: MetastoreRelation = { import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe @@ -68,24 +68,24 @@ case class CreateTableAsSelect( withFormat } - sqlContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) + sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - sqlContext.sessionState.catalog.lookupRelation(tableIdentifier) match { + sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (sqlContext.sessionState.catalog.tableExists(tableIdentifier)) { + if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { throw new AnalysisException(s"$tableIdentifier already exists.") } } else { - sqlContext.executePlan(InsertIntoTable( + sparkSession.executePlan(InsertIntoTable( metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd } http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 0f72091..cc5bbf5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ @@ -48,8 +48,8 @@ case class HiveTableScanExec( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Seq[Expression])( - @transient val context: SQLContext, - @transient val hiveconf: HiveConf) + @transient private val sparkSession: SparkSession, + @transient private val hiveconf: HiveConf) extends LeafExecNode { require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, @@ -84,7 +84,7 @@ case class HiveTableScanExec( @transient private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, context, hiveExtraConf) + new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf) private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 1095f5f..cb49fc9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -52,17 +52,17 @@ private[sql] class DefaultSource override def toString: String = "ORC" override def inferSchema( - sqlContext: SQLContext, + sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { OrcFileOperator.readSchema( files.map(_.getPath.toUri.toString), - Some(new Configuration(sqlContext.sessionState.hadoopConf)) + Some(new Configuration(sparkSession.sessionState.hadoopConf)) ) } override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { @@ -109,15 +109,15 @@ private[sql] class DefaultSource } override def buildReader( - sqlContext: SQLContext, + sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - val orcConf = new Configuration(sqlContext.sessionState.hadoopConf) + val orcConf = new Configuration(sparkSession.sessionState.hadoopConf) - if (sqlContext.conf.orcFilterPushDown) { + if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(filters.toArray).foreach { f => orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) @@ -125,7 +125,8 @@ private[sql] class DefaultSource } } - val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf)) + val broadcastedConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf)) (file: PartitionedFile) => { val conf = broadcastedConf.value.value @@ -270,7 +271,7 @@ private[orc] class OrcOutputWriter( } private[orc] case class OrcTableScan( - @transient sqlContext: SQLContext, + @transient sparkSession: SparkSession, attributes: Seq[Attribute], filters: Array[Filter], @transient inputPaths: Seq[FileStatus]) @@ -278,11 +279,11 @@ private[orc] case class OrcTableScan( with HiveInspectors { def execute(): RDD[InternalRow] = { - val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf)) + val job = Job.getInstance(new Configuration(sparkSession.sessionState.hadoopConf)) val conf = job.getConfiguration // Tries to push down filters if ORC filter push-down is enabled - if (sqlContext.conf.orcFilterPushDown) { + if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(filters).foreach { f => conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) @@ -294,14 +295,14 @@ private[orc] case class OrcTableScan( val orcFormat = new DefaultSource val dataSchema = orcFormat - .inferSchema(sqlContext, Map.empty, inputPaths) + .inferSchema(sparkSession, Map.empty, inputPaths) .getOrElse(sys.error("Failed to read schema from target ORC files.")) // Sets requested columns OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) if (inputPaths.isEmpty) { // the input path probably be pruned, return an empty RDD. - return sqlContext.sparkContext.emptyRDD[InternalRow] + return sparkSession.sparkContext.emptyRDD[InternalRow] } FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) @@ -309,7 +310,7 @@ private[orc] case class OrcTableScan( classOf[OrcInputFormat] .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] - val rdd = sqlContext.sparkContext.hadoopRDD( + val rdd = sparkSession.sparkContext.hadoopRDD( conf.asInstanceOf[JobConf], inputFormatClass, classOf[NullWritable], http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 04b2494..f74e5cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -71,7 +71,9 @@ object TestHive * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of * test cases that rely on TestHive must be serialized. */ -class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean) +class TestHiveContext( + @transient override val sparkSession: TestHiveSparkSession, + isRootContext: Boolean) extends SQLContext(sparkSession, isRootContext) { def this(sc: SparkContext) { @@ -106,11 +108,11 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC private[hive] class TestHiveSparkSession( - sc: SparkContext, + @transient private val sc: SparkContext, val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], - existingSharedState: Option[TestHiveSharedState]) + @transient private val existingSharedState: Option[TestHiveSharedState]) extends SparkSession(sc) with Logging { self => def this(sc: SparkContext) { @@ -463,7 +465,7 @@ private[hive] class TestHiveSparkSession( private[hive] class TestHiveQueryExecution( sparkSession: TestHiveSparkSession, logicalPlan: LogicalPlan) - extends QueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging { + extends QueryExecution(sparkSession, logicalPlan) with Logging { def this(sparkSession: TestHiveSparkSession, sql: String) { this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) @@ -525,7 +527,7 @@ private[hive] class TestHiveSharedState( private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) - extends HiveSessionState(new SQLContext(sparkSession)) { + extends HiveSessionState(sparkSession) { override lazy val conf: SQLConf = { new SQLConf { http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala index b121600..27c9e99 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala @@ -64,7 +64,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton { """.stripMargin) } - checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext, plan)) + checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext.sparkSession, plan)) } protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 5965cdc..7cd01c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -701,7 +701,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Manually create a metastore data source table. CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -910,7 +910,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -925,7 +925,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .forall(column => DataTypeParser.parse(column.dataType) == StringType)) CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index bc87d3e..b16c9c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -975,7 +975,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. // todo: remove it? - val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan) + val newActual = Dataset.ofRows(sqlContext.sparkSession, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { case Some(errorMessage) => http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala index 4a2d190..5a8a7f0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType @@ -33,7 +33,7 @@ class CommitFailureTestSource extends SimpleTextSource { * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = http://git-wip-us.apache.org/repos/asf/spark/blob/18c2c925/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index eced8ed..e4bd1f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} -import org.apache.spark.sql.{sources, Row, SQLContext} +import org.apache.spark.sql.{sources, Row, SparkSession} import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -37,14 +37,14 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { override def shortName(): String = "test" override def inferSchema( - sqlContext: SQLContext, + sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType]) } override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { @@ -58,7 +58,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { } override def buildReader( - sqlContext: SQLContext, + sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, @@ -74,9 +74,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { inputAttributes.find(_.name == field.name) } - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = new Configuration(sparkSession.sessionState.hadoopConf) val broadcastedConf = - sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) (file: PartitionedFile) => { val predicate = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
