Repository: spark Updated Branches: refs/heads/master c431a76d0 -> d7d0cad0a
http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 62f991f..9bb901b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.DataSourceScan => p + case p: execution.DataSourceScanExec => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index f615019..5691105 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.Filter +import org.apache.spark.sql.execution.FilterExec import org.apache.spark.util.Utils /** @@ -242,7 +242,7 @@ private[sql] trait SQLTestUtils protected def stripSparkFilter(df: DataFrame): DataFrame = { val schema = df.schema val withoutFilters = df.queryExecution.sparkPlan transform { - case Filter(_, child) => child + case FilterExec(_, child) => child } val childRDD = withoutFilters http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index e7d2b5a..eb25ea0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.sql.{functions, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} -import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegen} +import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.test.SharedSQLContext class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { @@ -93,7 +93,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { val metric = qe.executedPlan match { - case w: WholeStageCodegen => w.child.longMetric("numOutputRows") + case w: WholeStageCodegenExec => w.child.longMetric("numOutputRows") case other => other.longMetric("numOutputRows") } metrics += metric.value.value http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index bc45334..f15f5b0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -224,7 +224,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select * from test_table") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryColumnarTableScan")) + assert(plan.getString(1).contains("InMemoryTableScan")) val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") val buf1 = new collection.mutable.ArrayBuffer[Int]() @@ -310,7 +310,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC") plan.next() plan.next() - assert(plan.getString(1).contains("InMemoryColumnarTableScan")) + assert(plan.getString(1).contains("InMemoryTableScan")) val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") val buf = new collection.mutable.ArrayBuffer[Int]() http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5b7fbe0..2d36dda 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -78,7 +78,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScan(_, relation, pruningPredicates)(context, hiveconf)) :: Nil + HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil case _ => Nil } @@ -91,17 +91,17 @@ private[hive] trait HiveStrategies { val cmd = CreateMetastoreDataSource( tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath) - ExecutedCommand(cmd) :: Nil + ExecutedCommandExec(cmd) :: Nil case c: CreateTableUsingAsSelect if c.temporary => val cmd = CreateTempTableUsingAsSelect( c.tableIdent, c.provider, c.partitionColumns, c.mode, c.options, c.child) - ExecutedCommand(cmd) :: Nil + ExecutedCommandExec(cmd) :: Nil case c: CreateTableUsingAsSelect => val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns, c.bucketSpec, c.mode, c.options, c.child) - ExecutedCommand(cmd) :: Nil + ExecutedCommandExec(cmd) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala deleted file mode 100644 index 9a83466..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} -import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.hive._ -import org.apache.spark.sql.types.{BooleanType, DataType} -import org.apache.spark.util.Utils - -/** - * The Hive table scan operator. Column and partition pruning are both handled. - * - * @param requestedAttributes Attributes to be fetched from the Hive table. - * @param relation The Hive table be be scanned. - * @param partitionPruningPred An optional partition pruning predicate for partitioned table. - */ -private[hive] -case class HiveTableScan( - requestedAttributes: Seq[Attribute], - relation: MetastoreRelation, - partitionPruningPred: Seq[Expression])( - @transient val context: SQLContext, - @transient val hiveconf: HiveConf) - extends LeafNode { - - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, - "Partition pruning predicates only supported for partitioned tables.") - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - override def producedAttributes: AttributeSet = outputSet ++ - AttributeSet(partitionPruningPred.flatMap(_.references)) - - // Retrieve the original attributes based on expression ID so that capitalization matches. - val attributes = requestedAttributes.map(relation.attributeMap) - - // Bind all partition key attribute references in the partition pruning predicate for later - // evaluation. - private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => - require( - pred.dataType == BooleanType, - s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - - BindReferences.bindReference(pred, relation.partitionKeys) - } - - // Create a local copy of hiveconf,so that scan specific modifications should not impact - // other queries - @transient - private[this] val hiveExtraConf = new HiveConf(hiveconf) - - // append columns ids and names before broadcast - addColumnMetadataToConf(hiveExtraConf) - - @transient - private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, context, hiveExtraConf) - - private[this] def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) - } - - private def addColumnMetadataToConf(hiveConf: HiveConf) { - // Specifies needed column IDs for those non-partitioning columns. - val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) - - HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) - - val tableDesc = relation.tableDesc - val deserializer = tableDesc.getDeserializerClass.newInstance - deserializer.initialize(hiveConf, tableDesc.getProperties) - - // Specifies types and object inspectors of columns to be scanned. - val structOI = ObjectInspectorUtils - .getStandardObjectInspector( - deserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val columnTypeNames = structOI - .getAllStructFieldRefs.asScala - .map(_.getFieldObjectInspector) - .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) - .mkString(",") - - hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) - } - - /** - * Prunes partitions not involve the query plan. - * - * @param partitions All partitions of the relation. - * @return Partitions that are involved in the query plan. - */ - private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = part.getValues.asScala.zip(dataTypes) - .map { case (value, dataType) => castFromString(value, dataType) } - - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. - val row = InternalRow.fromSeq(castedValues) - shouldKeep.eval(row).asInstanceOf[Boolean] - } - } - } - - protected override def doExecute(): RDD[InternalRow] = { - // Using dummyCallSite, as getCallSite can turn out to be expensive with - // with multiple partitions. - val rdd = if (!relation.hiveQlTable.isPartitioned) { - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) - } - } else { - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) - } - } - val numOutputRows = longMetric("numOutputRows") - rdd.mapPartitionsInternal { iter => - val proj = UnsafeProjection.create(schema) - iter.map { r => - numOutputRows += 1 - proj(r) - } - } - } - - override def output: Seq[Attribute] = attributes -} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala new file mode 100644 index 0000000..0f72091 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.types.{BooleanType, DataType} +import org.apache.spark.util.Utils + +/** + * The Hive table scan operator. Column and partition pruning are both handled. + * + * @param requestedAttributes Attributes to be fetched from the Hive table. + * @param relation The Hive table be be scanned. + * @param partitionPruningPred An optional partition pruning predicate for partitioned table. + */ +private[hive] +case class HiveTableScanExec( + requestedAttributes: Seq[Attribute], + relation: MetastoreRelation, + partitionPruningPred: Seq[Expression])( + @transient val context: SQLContext, + @transient val hiveconf: HiveConf) + extends LeafExecNode { + + require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + "Partition pruning predicates only supported for partitioned tables.") + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + override def producedAttributes: AttributeSet = outputSet ++ + AttributeSet(partitionPruningPred.flatMap(_.references)) + + // Retrieve the original attributes based on expression ID so that capitalization matches. + val attributes = requestedAttributes.map(relation.attributeMap) + + // Bind all partition key attribute references in the partition pruning predicate for later + // evaluation. + private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + + BindReferences.bindReference(pred, relation.partitionKeys) + } + + // Create a local copy of hiveconf,so that scan specific modifications should not impact + // other queries + @transient + private[this] val hiveExtraConf = new HiveConf(hiveconf) + + // append columns ids and names before broadcast + addColumnMetadataToConf(hiveExtraConf) + + @transient + private[this] val hadoopReader = + new HadoopTableReader(attributes, relation, context, hiveExtraConf) + + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + private def addColumnMetadataToConf(hiveConf: HiveConf) { + // Specifies needed column IDs for those non-partitioning columns. + val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) + + val tableDesc = relation.tableDesc + val deserializer = tableDesc.getDeserializerClass.newInstance + deserializer.initialize(hiveConf, tableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) + } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = relation.partitionKeys.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => castFromString(value, dataType) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + protected override def doExecute(): RDD[InternalRow] = { + // Using dummyCallSite, as getCallSite can turn out to be expensive with + // with multiple partitions. + val rdd = if (!relation.hiveQlTable.isPartitioned) { + Utils.withDummyCallSite(sqlContext.sparkContext) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } + } else { + Utils.withDummyCallSite(sqlContext.sparkContext) { + hadoopReader.makeRDDForPartitionedTable( + prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) + } + } + val numOutputRows = longMetric("numOutputRows") + rdd.mapPartitionsInternal { iter => + val proj = UnsafeProjection.create(schema) + iter.map { r => + numOutputRows += 1 + proj(r) + } + } + } + + override def output: Seq[Attribute] = attributes +} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index e614daa..3cb6081 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -41,7 +41,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryNode { + ifNotExists: Boolean) extends UnaryExecNode { @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] @transient private val client = sessionState.metadataHive http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 8c8becf..f27337e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -59,7 +59,7 @@ case class ScriptTransformation( output: Seq[Attribute], child: SparkPlan, ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf) - extends UnaryNode { + extends UnaryExecNode { override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 11384a0..97bd47a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} -import org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils @@ -31,7 +31,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { def rddIdOf(tableName: String): Int = { val plan = table(tableName).queryExecution.sparkPlan plan.collect { - case InMemoryColumnarTableScan(_, _, relation) => + case InMemoryTableScanExec(_, _, relation) => relation.cachedColumnBuffers.id case _ => fail(s"Table $tableName is not cached\n" + plan) @@ -211,7 +211,7 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { cacheTable("cachedTable") val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan - assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1) + assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1) sql("DROP TABLE cachedTable") } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 565b310..93a6f0b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -153,7 +153,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. - var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + var bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } assert(bhj.size === 1, s"actual query plans do not contain broadcast join: ${df.queryExecution}") @@ -164,10 +164,10 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""") df = sql(query) - bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j } + val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoinExec => j } assert(shj.size === 1, "SortMergeJoin should be planned when BroadcastHashJoin is turned off") @@ -210,7 +210,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { // Using `sparkPlan` because for relevant patterns in HashJoin to be // matched, other strategies need to be applied. var bhj = df.queryExecution.sparkPlan.collect { - case j: BroadcastHashJoin => j + case j: BroadcastHashJoinExec => j } assert(bhj.size === 1, s"actual query plans do not contain broadcast join: ${df.queryExecution}") @@ -223,12 +223,12 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") df = sql(leftSemiJoinQuery) bhj = df.queryExecution.sparkPlan.collect { - case j: BroadcastHashJoin => j + case j: BroadcastHashJoinExec => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") val shj = df.queryExecution.sparkPlan.collect { - case j: ShuffledHashJoin => j + case j: ShuffledHashJoinExec => j } assert(shj.size === 1, "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 3ddffeb..aac5cc6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -485,7 +485,7 @@ abstract class HiveComparisonTest // also print out the query plans and results for those. val computedTablesMessages: String = try { val tablesRead = new TestHiveQueryExecution(query).executedPlan.collect { - case ts: HiveTableScan => ts.relation.tableName + case ts: HiveTableScanExec => ts.relation.tableName }.toSet TestHive.reset() http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 93d63f2..467a672 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} import org.apache.spark.sql.hive.test.TestHive._ @@ -121,7 +121,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("SPARK-10484 Optimize the Cartesian (Cross) Join with broadcast based JOIN") { def assertBroadcastNestedLoopJoin(sqlText: String): Unit = { assert(sql(sqlText).queryExecution.sparkPlan.collect { - case _: BroadcastNestedLoopJoin => 1 + case _: BroadcastNestedLoopJoinExec => 1 }.nonEmpty) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala index 6b424d7..2de429b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo} -import org.apache.spark.sql.execution.Project +import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.hive.test.TestHive /** @@ -50,7 +50,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest { test("[SPARK-2210] boolean cast on boolean value should be removed") { val q = "select cast(cast(key=0 as boolean) as boolean) from src" val project = TestHive.sql(q).queryExecution.sparkPlan.collect { - case e: Project => e + case e: ProjectExec => e }.head // No cast expression introduced http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 12f30e2..24df73b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { val plan = new TestHiveQueryExecution(sql).sparkPlan val actualOutputColumns = plan.output.map(_.name) val (actualScannedColumns, actualPartValues) = plan.collect { - case p @ HiveTableScan(columns, relation, _) => + case p @ HiveTableScanExec(columns, relation, _) => val columnNames = columns.map(_.name) val partValues = if (relation.catalogTable.partitionColumnNames.nonEmpty) { p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 00b5c8d..1a15fb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryNode} +import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types.StringType @@ -111,7 +111,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { } } -private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryNode { +private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { override protected def doExecute(): RDD[InternalRow] = { child.execute().map { x => assert(TaskContext.get() != null) // Make sure that TaskContext is defined. http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 2984ee9..1c1f6d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,10 +21,10 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.DataSourceScan -import org.apache.spark.sql.execution.command.ExecutedCommand +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.hive.execution.HiveTableScan +import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -192,11 +192,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { test(s"conversion is working") { assert( sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { - case _: HiveTableScan => true + case _: HiveTableScanExec => true }.isEmpty) assert( sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect { - case _: DataSourceScan => true + case _: DataSourceScanExec => true }.nonEmpty) } @@ -307,7 +307,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.sparkPlan match { - case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK + case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " + @@ -337,7 +337,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.sparkPlan match { - case ExecutedCommand(_: InsertIntoHadoopFsRelation) => // OK + case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[HadoopFsRelation ].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." + http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index aa6101f..d271e55 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -22,10 +22,10 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.DataSourceScan +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSourceStrategy} import org.apache.spark.sql.execution.exchange.ShuffleExchange -import org.apache.spark.sql.execution.joins.SortMergeJoin +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // Filter could hide the bug in bucket pruning. Thus, skipping all the filters val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - val rdd = plan.find(_.isInstanceOf[DataSourceScan]) + val rdd = plan.find(_.isInstanceOf[DataSourceScanExec]) assert(rdd.isDefined, plan) val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => @@ -261,8 +261,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet joined.sort("bucketed_table1.k", "bucketed_table2.k"), df1.join(df2, joinCondition(df1, df2, joinColumns)).sort("df1.k", "df2.k")) - assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoin]) - val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoin] + assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec]) + val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec] assert( joinOperator.left.find(_.isInstanceOf[ShuffleExchange]).isDefined == shuffleLeft, http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index a15bd22..19749a9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -152,8 +152,8 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { val df = sqlContext.read.parquet(path).filter('a === 0).select('b) val physicalPlan = df.queryExecution.sparkPlan - assert(physicalPlan.collect { case p: execution.Project => p }.length === 1) - assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1) + assert(physicalPlan.collect { case p: execution.ProjectExec => p }.length === 1) + assert(physicalPlan.collect { case p: execution.FilterExec => p }.length === 1) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 089cef6..5378336 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ -import org.apache.spark.sql.execution.DataSourceScan +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -688,7 +688,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .load(path) val Some(fileScanRDD) = df2.queryExecution.executedPlan.collectFirst { - case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => scan.rdd.asInstanceOf[FileScanRDD] } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
