http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b44d4c8..1828ed1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -245,7 +245,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { JsonRDD.nullTypeToStringType( JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord))) val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord) - sqlContext.createDataFrame(rowRDD, appliedSchema, needsConversion = false) + sqlContext.internalCreateDataFrame(rowRDD, appliedSchema) } }
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 5f758ad..22d0e50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.{InternalRow, _} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.errors.DialectException @@ -486,15 +486,27 @@ class SQLContext(@transient val sparkContext: SparkContext) // schema differs from the existing schema on any field data type. val catalystRows = if (needsConversion) { val converter = CatalystTypeConverters.createToCatalystConverter(schema) - rowRDD.map(converter(_).asInstanceOf[Row]) + rowRDD.map(converter(_).asInstanceOf[InternalRow]) } else { - rowRDD + rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) DataFrame(this, logicalPlan) } /** + * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be + * converted to Catalyst rows. + */ + private[sql] + def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = { + // TODO: use MutableProjection when rowRDD is another DataFrame and the applied + // schema differs from the existing schema on any field data type. + val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) + DataFrame(this, logicalPlan) + } + + /** * :: DeveloperApi :: * Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema. * It is important to make sure that the structure of every [[Row]] of the provided RDD matches @@ -531,7 +543,7 @@ class SQLContext(@transient val sparkContext: SparkContext) extractors.zip(attributeSeq).map { case (e, attr) => CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType) }.toArray[Any] - ) : Row + ) : InternalRow } } DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this)) @@ -886,7 +898,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] val planner = new SparkPlanner @transient - protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1) + protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1) /** * Prepares a planned SparkPlan for execution by inserting shuffle operations as needed. @@ -953,7 +965,7 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ - lazy val toRdd: RDD[Row] = executedPlan.execute() + lazy val toRdd: RDD[InternalRow] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } @@ -1035,7 +1047,7 @@ class SQLContext(@transient val sparkContext: SparkContext) } val rowRdd = convertedRdd.mapPartitions { iter => - iter.map { m => new GenericRow(m): Row} + iter.map { m => new GenericRow(m): InternalRow} } DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index aa10af4..cc7506d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.columnar import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.columnar.ColumnBuilder._ import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} import org.apache.spark.sql.types._ @@ -33,7 +32,7 @@ private[sql] trait ColumnBuilder { /** * Appends `row(ordinal)` to the column builder. */ - def appendFrom(row: Row, ordinal: Int) + def appendFrom(row: InternalRow, ordinal: Int) /** * Column statistics information @@ -68,7 +67,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType]( buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) } - override def appendFrom(row: Row, ordinal: Int): Unit = { + override def appendFrom(row: InternalRow, ordinal: Int): Unit = { buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal)) columnType.append(row, ordinal, buffer) } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index 11c79c8..1bce214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.columnar -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -53,7 +53,7 @@ private[sql] sealed trait ColumnStats extends Serializable { /** * Gathers statistics information from `row(ordinal)`. */ - def gatherStats(row: Row, ordinal: Int): Unit = { + def gatherStats(row: InternalRow, ordinal: Int): Unit = { if (row.isNullAt(ordinal)) { nullCount += 1 // 4 bytes for null position @@ -66,23 +66,23 @@ private[sql] sealed trait ColumnStats extends Serializable { * Column statistics represented as a single row, currently including closed lower bound, closed * upper bound and null count. */ - def collectedStatistics: Row + def collectedStatistics: InternalRow } /** * A no-op ColumnStats only used for testing purposes. */ private[sql] class NoopColumnStats extends ColumnStats { - override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal) + override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal) - override def collectedStatistics: Row = Row(null, null, nullCount, count, 0L) + override def collectedStatistics: InternalRow = InternalRow(null, null, nullCount, count, 0L) } private[sql] class BooleanColumnStats extends ColumnStats { protected var upper = false protected var lower = true - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getBoolean(ordinal) @@ -92,14 +92,15 @@ private[sql] class BooleanColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class ByteColumnStats extends ColumnStats { protected var upper = Byte.MinValue protected var lower = Byte.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getByte(ordinal) @@ -109,14 +110,15 @@ private[sql] class ByteColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class ShortColumnStats extends ColumnStats { protected var upper = Short.MinValue protected var lower = Short.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getShort(ordinal) @@ -126,14 +128,15 @@ private[sql] class ShortColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class LongColumnStats extends ColumnStats { protected var upper = Long.MinValue protected var lower = Long.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getLong(ordinal) @@ -143,14 +146,15 @@ private[sql] class LongColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class DoubleColumnStats extends ColumnStats { protected var upper = Double.MinValue protected var lower = Double.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getDouble(ordinal) @@ -160,14 +164,15 @@ private[sql] class DoubleColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class FloatColumnStats extends ColumnStats { protected var upper = Float.MinValue protected var lower = Float.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getFloat(ordinal) @@ -177,14 +182,15 @@ private[sql] class FloatColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class FixedDecimalColumnStats extends ColumnStats { protected var upper: Decimal = null protected var lower: Decimal = null - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row(ordinal).asInstanceOf[Decimal] @@ -194,14 +200,15 @@ private[sql] class FixedDecimalColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class IntColumnStats extends ColumnStats { protected var upper = Int.MinValue protected var lower = Int.MaxValue - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row.getInt(ordinal) @@ -211,14 +218,15 @@ private[sql] class IntColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class StringColumnStats extends ColumnStats { protected var upper: UTF8String = null protected var lower: UTF8String = null - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { val value = row(ordinal).asInstanceOf[UTF8String] @@ -228,7 +236,8 @@ private[sql] class StringColumnStats extends ColumnStats { } } - override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(lower, upper, nullCount, count, sizeInBytes) } private[sql] class DateColumnStats extends IntColumnStats @@ -236,23 +245,25 @@ private[sql] class DateColumnStats extends IntColumnStats private[sql] class TimestampColumnStats extends LongColumnStats private[sql] class BinaryColumnStats extends ColumnStats { - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { sizeInBytes += BINARY.actualSize(row, ordinal) } } - override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(null, null, nullCount, count, sizeInBytes) } private[sql] class GenericColumnStats extends ColumnStats { - override def gatherStats(row: Row, ordinal: Int): Unit = { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { super.gatherStats(row, ordinal) if (!row.isNullAt(ordinal)) { sizeInBytes += GENERIC.actualSize(row, ordinal) } } - override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes) + override def collectedStatistics: InternalRow = + InternalRow(null, null, nullCount, count, sizeInBytes) } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 3db26fa..761f427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -19,21 +19,16 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer -import org.apache.spark.{Accumulable, Accumulator, Accumulators} -import org.apache.spark.sql.catalyst.expressions - import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row -import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.execution.{LeafNode, SparkPlan} import org.apache.spark.storage.StorageLevel +import org.apache.spark.{Accumulable, Accumulator, Accumulators} private[sql] object InMemoryRelation { def apply( @@ -45,7 +40,7 @@ private[sql] object InMemoryRelation { new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)() } -private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row) +private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: InternalRow) private[sql] case class InMemoryRelation( output: Seq[Attribute], @@ -56,12 +51,12 @@ private[sql] case class InMemoryRelation( tableName: Option[String])( private var _cachedColumnBuffers: RDD[CachedBatch] = null, private var _statistics: Statistics = null, - private var _batchStats: Accumulable[ArrayBuffer[Row], Row] = null) + private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) extends LogicalPlan with MultiInstanceRelation { - private val batchStats: Accumulable[ArrayBuffer[Row], Row] = + private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row]) + child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) } else { _batchStats } @@ -151,7 +146,7 @@ private[sql] case class InMemoryRelation( rowCount += 1 } - val stats = Row.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*) + val stats = InternalRow.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*) batchStats += stats CachedBatch(columnBuilders.map(_.build().array()), stats) @@ -267,7 +262,7 @@ private[sql] case class InMemoryColumnarTableScan( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { if (enableAccumulators) { readPartitions.setValue(0) readBatches.setValue(0) @@ -296,7 +291,7 @@ private[sql] case class InMemoryColumnarTableScan( val nextRow = new SpecificMutableRow(requestedColumnDataTypes) - def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[Row] = { + def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[InternalRow] = { val rows = cacheBatches.flatMap { cachedBatch => // Build column accessors val columnAccessors = requestedColumnIndices.map { batchColumnIndex => @@ -306,15 +301,15 @@ private[sql] case class InMemoryColumnarTableScan( } // Extract rows via column accessors - new Iterator[Row] { + new Iterator[InternalRow] { private[this] val rowLen = nextRow.length - override def next(): Row = { + override def next(): InternalRow = { var i = 0 while (i < rowLen) { columnAccessors(i).extractTo(nextRow, i) i += 1 } - if (attributes.isEmpty) Row.empty else nextRow + if (attributes.isEmpty) InternalRow.empty else nextRow } override def hasNext: Boolean = columnAccessors(0).hasNext http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index f1f494a..ba47bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.columnar import java.nio.{ByteBuffer, ByteOrder} -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow /** * A stackable trait used for building byte buffer for a column containing null values. Memory @@ -52,7 +52,7 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder { super.initialize(initialSize, columnName, useCompression) } - abstract override def appendFrom(row: Row, ordinal: Int): Unit = { + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { columnStats.gatherStats(row, ordinal) if (row.isNullAt(ordinal)) { nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index 8e2a1af..39b21dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.columnar.compression import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.Logging -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder} import org.apache.spark.sql.types.AtomicType @@ -66,7 +66,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType] encoder.compressionRatio < 0.8 } - private def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { var i = 0 while (i < compressionEncoders.length) { compressionEncoders(i).gatherCompressibilityStats(row, ordinal) @@ -74,7 +74,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType] } } - abstract override def appendFrom(row: Row, ordinal: Int): Unit = { + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { super.appendFrom(row, ordinal) if (!row.isNullAt(ordinal)) { gatherCompressibilityStats(row, ordinal) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index 17c2d9b..4eaec6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.columnar.compression import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.MutableRow import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType} import org.apache.spark.sql.types.AtomicType private[sql] trait Encoder[T <: AtomicType] { - def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {} + def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {} def compressedSize: Int http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index 534ae90..5abc125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -22,8 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable import scala.reflect.ClassTag import scala.reflect.runtime.universe.runtimeMirror - -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} import org.apache.spark.sql.columnar._ import org.apache.spark.sql.types._ @@ -96,7 +95,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme { override def compressedSize: Int = _compressedSize - override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { val value = columnType.getField(row, ordinal) val actualSize = columnType.actualSize(row, ordinal) _uncompressedSize += actualSize @@ -217,7 +216,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme { // to store dictionary element count. private var dictionarySize = 4 - override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { val value = columnType.getField(row, ordinal) if (!overflow) { @@ -310,7 +309,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme { class Encoder extends compression.Encoder[BooleanType.type] { private var _uncompressedSize = 0 - override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { _uncompressedSize += BOOLEAN.defaultSize } @@ -404,7 +403,7 @@ private[sql] case object IntDelta extends CompressionScheme { private var prevValue: Int = _ - override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { val value = row.getInt(ordinal) val delta = value - prevValue @@ -484,7 +483,7 @@ private[sql] case object LongDelta extends CompressionScheme { private var prevValue: Long = _ - override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = { + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { val value = row.getLong(ordinal) val delta = value - prevValue http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 8d16749..6e8a5ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -20,12 +20,10 @@ package org.apache.spark.sql.execution import java.util.HashMap import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.SQLContext /** * :: DeveloperApi :: @@ -121,11 +119,11 @@ case class Aggregate( } } - protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { if (groupingExpressions.isEmpty) { child.execute().mapPartitions { iter => val buffer = newAggregateBuffer() - var currentRow: Row = null + var currentRow: InternalRow = null while (iter.hasNext) { currentRow = iter.next() var i = 0 @@ -147,10 +145,10 @@ case class Aggregate( } } else { child.execute().mapPartitions { iter => - val hashTable = new HashMap[Row, Array[AggregateFunction]] + val hashTable = new HashMap[InternalRow, Array[AggregateFunction]] val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) - var currentRow: Row = null + var currentRow: InternalRow = null while (iter.hasNext) { currentRow = iter.next() val currentGroup = groupingProjection(currentRow) @@ -167,7 +165,7 @@ case class Aggregate( } } - new Iterator[Row] { + new Iterator[InternalRow] { private[this] val hashTableIter = hashTable.entrySet().iterator() private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) private[this] val resultProjection = @@ -177,7 +175,7 @@ case class Aggregate( override final def hasNext: Boolean = hashTableIter.hasNext - override final def next(): Row = { + override final def next(): InternalRow = { val currentEntry = hashTableIter.next() val currentGroup = currentEntry.getKey val currentBuffer = currentEntry.getValue http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 6fa7ccc..c9a1883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -17,19 +17,19 @@ package org.apache.spark.sql.execution -import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.util.MutablePair +import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} /** * :: DeveloperApi :: @@ -157,7 +157,7 @@ case class Exchange( serializer } - protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { + protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => val keySchema = expressions.map(_.dataType).toArray @@ -173,11 +173,11 @@ case class Exchange( } else { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() - val mutablePair = new MutablePair[Row, Row]() + val mutablePair = new MutablePair[InternalRow, InternalRow]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } } - val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) + val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(serializer) shuffled.map(_._2) @@ -190,7 +190,7 @@ case class Exchange( // Internally, RangePartitioner runs a job on the RDD that samples keys to compute // partition bounds. To get accurate samples, we need to copy the mutable keys. val rddForSampling = childRdd.mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null]() + val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } // TODO: RangePartitioner should take an Ordering. @@ -202,12 +202,12 @@ case class Exchange( childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))} } else { childRdd.mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null]() + val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row, null)) } } - val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) + val shuffled = new ShuffledRDD[InternalRow, Null, Null](rdd, part) shuffled.setSerializer(serializer) shuffled.map(_._1) @@ -217,14 +217,16 @@ case class Exchange( val partitioner = new HashPartitioner(1) val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) { - child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } + child.execute().mapPartitions { + iter => iter.map(r => (null, r.copy())) + } } else { child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Null, Row]() + val mutablePair = new MutablePair[Null, InternalRow]() iter.map(r => mutablePair.update(null, r)) } } - val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) + val shuffled = new ShuffledRDD[Null, InternalRow, InternalRow](rdd, partitioner) shuffled.setSerializer(serializer) shuffled.map(_._2) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index f931dc9..da27a75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} @@ -31,7 +31,7 @@ import org.apache.spark.sql.{Row, SQLContext} */ @DeveloperApi object RDDConversions { - def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = { + def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericMutableRow(numColumns) @@ -51,7 +51,7 @@ object RDDConversions { /** * Convert the objects inside Row into the types Catalyst expected. */ - def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = { + def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = { data.mapPartitions { iterator => val numColumns = outputTypes.length val mutableRow = new GenericMutableRow(numColumns) @@ -70,7 +70,9 @@ object RDDConversions { } /** Logical plan node for scanning data from an RDD. */ -private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext) +private[sql] case class LogicalRDD( + output: Seq[Attribute], + rdd: RDD[InternalRow])(sqlContext: SQLContext) extends LogicalPlan with MultiInstanceRelation { override def children: Seq[LogicalPlan] = Nil @@ -91,13 +93,15 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon } /** Physical plan node for scanning data from an RDD. */ -private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { - protected override def doExecute(): RDD[Row] = rdd +private[sql] case class PhysicalRDD( + output: Seq[Attribute], + rdd: RDD[InternalRow]) extends LeafNode { + protected override def doExecute(): RDD[InternalRow] = rdd } /** Logical plan node for scanning data from a local collection. */ private[sql] -case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[Row])(sqlContext: SQLContext) +case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[InternalRow])(sqlContext: SQLContext) extends LogicalPlan with MultiInstanceRelation { override def children: Seq[LogicalPlan] = Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index 4b601c1..42a0c1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{UnknownPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} /** * Apply the all of the GroupExpressions to every input row, hence we will get @@ -43,7 +42,7 @@ case class Expand( // as UNKNOWN partitioning override def outputPartitioning: Partitioning = UnknownPartitioning(0) - protected override def doExecute(): RDD[Row] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { child.execute().mapPartitions { iter => // TODO Move out projection objects creation and transfer to // workers via closure. However we can't assume the Projection @@ -51,14 +50,14 @@ case class Expand( // create the projections within each of the partition processing. val groups = projections.map(ee => newProjection(ee, child.output)).toArray - new Iterator[Row] { - private[this] var result: Row = _ + new Iterator[InternalRow] { + private[this] var result: InternalRow = _ private[this] var idx = -1 // -1 means the initial state - private[this] var input: Row = _ + private[this] var input: InternalRow = _ override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext - override final def next(): Row = { + override final def next(): InternalRow = { if (idx <= 0) { // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple input = iter.next() http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala index dd02c1f..c1665f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql.catalyst.expressions._ * For lazy computing, be sure the generator.terminate() called in the very last * TODO reusing the CompletionIterator? */ -private[execution] sealed case class LazyIterator(func: () => TraversableOnce[Row]) - extends Iterator[Row] { +private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow]) + extends Iterator[InternalRow] { lazy val results = func().toIterator override def hasNext: Boolean = results.hasNext - override def next(): Row = results.next() + override def next(): InternalRow = results.next() } /** @@ -58,11 +58,11 @@ case class Generate( val boundGenerator = BindReferences.bindReference(generator, child.output) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { // boundGenerator.terminate() should be triggered after all of the rows in the partition if (join) { child.execute().mapPartitions { iter => - val generatorNullRow = Row.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null)) + val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null)) val joinedRow = new JoinedRow iter.flatMap { row => http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 1c40a92..ba2c8f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -66,7 +66,7 @@ case class GeneratedAggregate( override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val aggregatesToCompute = aggregateExpressions.flatMap { a => a.collect { case agg: AggregateExpression => agg} } @@ -273,7 +273,7 @@ case class GeneratedAggregate( if (groupingExpressions.isEmpty) { // TODO: Codegening anything other than the updateProjection is probably over kill. val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow] - var currentRow: Row = null + var currentRow: InternalRow = null updateProjection.target(buffer) while (iter.hasNext) { @@ -295,19 +295,19 @@ case class GeneratedAggregate( ) while (iter.hasNext) { - val currentRow: Row = iter.next() - val groupKey: Row = groupProjection(currentRow) + val currentRow: InternalRow = iter.next() + val groupKey: InternalRow = groupProjection(currentRow) val aggregationBuffer = aggregationMap.getAggregationBuffer(groupKey) updateProjection.target(aggregationBuffer)(joinedRow(aggregationBuffer, currentRow)) } - new Iterator[Row] { + new Iterator[InternalRow] { private[this] val mapIterator = aggregationMap.iterator() private[this] val resultProjection = resultProjectionBuilder() def hasNext: Boolean = mapIterator.hasNext - def next(): Row = { + def next(): InternalRow = { val entry = mapIterator.next() val result = resultProjection(joinedRow(entry.key, entry.value)) if (hasNext) { @@ -326,9 +326,9 @@ case class GeneratedAggregate( if (unsafeEnabled) { log.info("Not using Unsafe-based aggregator because it is not supported for this schema") } - val buffers = new java.util.HashMap[Row, MutableRow]() + val buffers = new java.util.HashMap[InternalRow, MutableRow]() - var currentRow: Row = null + var currentRow: InternalRow = null while (iter.hasNext) { currentRow = iter.next() val currentGroup = groupProjection(currentRow) @@ -342,13 +342,13 @@ case class GeneratedAggregate( updateProjection.target(currentBuffer)(joinedRow(currentBuffer, currentRow)) } - new Iterator[Row] { + new Iterator[InternalRow] { private[this] val resultIterator = buffers.entrySet.iterator() private[this] val resultProjection = resultProjectionBuilder() def hasNext: Boolean = resultIterator.hasNext - def next(): Row = { + def next(): InternalRow = { val currentGroup = resultIterator.next() resultProjection(joinedRow(currentGroup.getKey, currentGroup.getValue)) } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala index 03bee80..cd34118 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala @@ -19,18 +19,20 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.expressions.Attribute /** * Physical plan node for scanning data from a local collection. */ -private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNode { +private[sql] case class LocalTableScan( + output: Seq[Attribute], + rows: Seq[InternalRow]) extends LeafNode { private lazy val rdd = sqlContext.sparkContext.parallelize(rows) - protected override def doExecute(): RDD[Row] = rdd + protected override def doExecute(): RDD[InternalRow] = rdd override def executeCollect(): Array[Row] = { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 435ac01..7739a9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ @@ -79,11 +80,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) /** - * Returns the result of this query as an RDD[Row] by delegating to doExecute + * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute * after adding query plan information to created RDDs for visualization. * Concrete implementations of SparkPlan should override doExecute instead. */ - final def execute(): RDD[Row] = { + final def execute(): RDD[InternalRow] = { RDDOperationScope.withScope(sparkContext, nodeName, false, true) { doExecute() } @@ -91,9 +92,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Overridden by concrete implementations of SparkPlan. - * Produces the result of the query as an RDD[Row] + * Produces the result of the query as an RDD[InternalRow] */ - protected def doExecute(): RDD[Row] + protected def doExecute(): RDD[InternalRow] /** * Runs this query returning the result as an array. @@ -117,7 +118,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val childRDD = execute().map(_.copy()) - val buf = new ArrayBuffer[Row] + val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length var partsScanned = 0 while (buf.size < n && partsScanned < totalParts) { @@ -140,7 +141,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val sc = sqlContext.sparkContext val res = - sc.runJob(childRDD, (it: Iterator[Row]) => it.take(left).toArray, p, allowLocal = false) + sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p, + allowLocal = false) res.foreach(buf ++= _.take(n - buf.size)) partsScanned += numPartsToTry @@ -175,7 +177,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def newPredicate( - expression: Expression, inputSchema: Seq[Attribute]): (Row) => Boolean = { + expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = { if (codegenEnabled) { GeneratePredicate.generate(expression, inputSchema) } else { @@ -183,7 +185,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } - protected def newOrdering(order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[Row] = { + protected def newOrdering( + order: Seq[SortOrder], + inputSchema: Seq[Attribute]): Ordering[InternalRow] = { if (codegenEnabled) { GenerateOrdering.generate(order, inputSchema) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 7a1331a..422992d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -203,7 +203,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } protected lazy val singleRowRdd = - sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) + sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1) object TakeOrdered extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index c4327ce..fd6f1d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.execution import java.util import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, ClusteredDistribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.util.collection.CompactBuffer /** @@ -112,16 +111,16 @@ case class Window( } } - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions { iter => - new Iterator[Row] { + new Iterator[InternalRow] { // Although input rows are grouped based on windowSpec.partitionSpec, we need to // know when we have a new partition. // This is to manually construct an ordering that can be used to compare rows. // TODO: We may want to have a newOrdering that takes BoundReferences. // So, we can take advantave of code gen. - private val partitionOrdering: Ordering[Row] = + private val partitionOrdering: Ordering[InternalRow] = RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType)) // This is used to project expressions for the partition specification. @@ -137,13 +136,13 @@ case class Window( // The number of buffered rows in the inputRowBuffer (the size of the current partition). var partitionSize: Int = 0 // The buffer used to buffer rows in a partition. - var inputRowBuffer: CompactBuffer[Row] = _ + var inputRowBuffer: CompactBuffer[InternalRow] = _ // The partition key of the current partition. - var currentPartitionKey: Row = _ + var currentPartitionKey: InternalRow = _ // The partition key of next partition. - var nextPartitionKey: Row = _ + var nextPartitionKey: InternalRow = _ // The first row of next partition. - var firstRowInNextPartition: Row = _ + var firstRowInNextPartition: InternalRow = _ // Indicates if this partition is the last one in the iter. var lastPartition: Boolean = false @@ -316,7 +315,7 @@ case class Window( !lastPartition || (rowPosition < partitionSize) } - override final def next(): Row = { + override final def next(): InternalRow = { if (hasNext) { if (rowPosition == partitionSize) { // All rows of this buffer have been consumed. @@ -353,7 +352,7 @@ case class Window( // Fetch the next partition. private def fetchNextPartition(): Unit = { // Create a new buffer for input rows. - inputRowBuffer = new CompactBuffer[Row]() + inputRowBuffer = new CompactBuffer[InternalRow]() // We already have the first row for this partition // (recorded in firstRowInNextPartition). Add it back. inputRowBuffer += firstRowInNextPartition http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index fb42072..7aedd63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,16 +17,17 @@ package org.apache.spark.sql.execution -import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.spark.{HashPartitioner, SparkEnv} /** * :: DeveloperApi :: @@ -37,7 +38,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends @transient lazy val buildProjection = newMutableProjection(projectList, child.output) - protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter => + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => val resuableProjection = buildProjection() iter.map(resuableProjection) } @@ -52,9 +53,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output) + @transient lazy val conditionEvaluator: (InternalRow) => Boolean = + newPredicate(condition, child.output) - protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter => + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.filter(conditionEvaluator) } @@ -83,7 +85,7 @@ case class Sample( override def output: Seq[Attribute] = child.output // TODO: How to pick seed? - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { if (withReplacement) { child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed) } else { @@ -99,7 +101,8 @@ case class Sample( case class Union(children: Seq[SparkPlan]) extends SparkPlan { // TODO: attributes output by union should be distinct for nullability purposes override def output: Seq[Attribute] = children.head.output - protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute())) + protected override def doExecute(): RDD[InternalRow] = + sparkContext.union(children.map(_.execute())) } /** @@ -124,19 +127,19 @@ case class Limit(limit: Int, child: SparkPlan) override def executeCollect(): Array[Row] = child.executeTake(limit) - protected override def doExecute(): RDD[Row] = { - val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) { + protected override def doExecute(): RDD[InternalRow] = { + val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.take(limit).map(row => (false, row.copy())) } } else { child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Boolean, Row]() + val mutablePair = new MutablePair[Boolean, InternalRow]() iter.take(limit).map(row => mutablePair.update(false, row)) } } val part = new HashPartitioner(1) - val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part) + val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf)) shuffled.mapPartitions(_.take(limit).map(_._2)) } @@ -157,7 +160,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) private val ord: RowOrdering = new RowOrdering(sortOrder, child.output) - private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord) + private def collectData(): Array[InternalRow] = + child.execute().map(_.copy()).takeOrdered(limit)(ord) override def executeCollect(): Array[Row] = { val converter = CatalystTypeConverters.createToScalaConverter(schema) @@ -166,7 +170,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) // TODO: Terminal split should be implemented differently from non-terminal split. // TODO: Pick num splits based on |limit|. - protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) override def outputOrdering: Seq[SortOrder] = sortOrder } @@ -186,7 +190,7 @@ case class Sort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) iterator.map(_.copy()).toArray.sorted(ordering).iterator @@ -214,14 +218,14 @@ case class ExternalSort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - protected override def doExecute(): RDD[Row] = attachTree(this, "sort") { + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { child.execute().mapPartitions( { iterator => val ordering = newOrdering(sortOrder, child.output) - val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) + val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r.copy, null))) val baseIterator = sorter.iterator.map(_._1) // TODO(marmbrus): The complex type signature below thwarts inference for no reason. - CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop()) + CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop()) }, preservesPartitioning = true) } @@ -239,7 +243,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { child.execute().map(_.copy()).coalesce(numPartitions, shuffle) } } @@ -254,7 +258,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } } @@ -268,7 +272,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = children.head.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } @@ -283,5 +287,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { def children: Seq[SparkPlan] = child :: Nil - protected override def doExecute(): RDD[Row] = child.execute() + protected override def doExecute(): RDD[InternalRow] = child.execute() } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 49b361e..653792e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext} /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -64,9 +64,9 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val converted = sideEffectResult.map(r => - CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row]) + CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow]) sqlContext.sparkContext.parallelize(converted, 1) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 83c1f65..3ee4033 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.unsafe.types.UTF8String @@ -25,7 +26,7 @@ import scala.collection.mutable.HashSet import org.apache.spark.{AccumulatorParam, Accumulator} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -126,11 +127,11 @@ package object debug { } } - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions { iter => - new Iterator[Row] { + new Iterator[InternalRow] { def hasNext: Boolean = iter.hasNext - def next(): Row = { + def next(): InternalRow = { val currentRow = iter.next() tupleCount += 1 var i = 0 @@ -155,7 +156,7 @@ package object debug { def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match { case (null, _) => - case (row: Row, StructType(fields)) => + case (row: InternalRow, StructType(fields)) => row.toSeq.zip(fields.map(_.dataType)).foreach { case(d, t) => typeCheck(d, t) } case (s: Seq[_], ArrayType(elemType, _)) => s.foreach(typeCheck(_, elemType)) @@ -196,7 +197,7 @@ package object debug { def children: List[SparkPlan] = child :: Nil - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { child.execute().map { row => try typeCheck(row, child.schema) catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala index e228a60..68914cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.expressions import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{Row, LeafExpression} +import org.apache.spark.sql.catalyst.expressions.{InternalRow, LeafExpression} import org.apache.spark.sql.types.{LongType, DataType} /** @@ -43,7 +43,7 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { override def dataType: DataType = LongType - override def eval(input: Row): Long = { + override def eval(input: InternalRow): Long = { val currentCount = count count += 1 (TaskContext.get().partitionId().toLong << 33) + currentCount http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala index 1272793..12c2eed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.expressions import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{LeafExpression, Row} +import org.apache.spark.sql.catalyst.expressions.{LeafExpression, InternalRow} import org.apache.spark.sql.types.{IntegerType, DataType} @@ -31,5 +31,5 @@ private[sql] case object SparkPartitionID extends LeafExpression { override def dataType: DataType = IntegerType - override def eval(input: Row): Int = TaskContext.get().partitionId() + override def eval(input: InternalRow): Int = TaskContext.get().partitionId() } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index b8b12be..2d2e1b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.execution.joins -import org.apache.spark.rdd.RDD -import org.apache.spark.util.ThreadUtils - import scala.concurrent._ import scala.concurrent.duration._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{Row, Expression} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Expression, InternalRow} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils /** * :: DeveloperApi :: @@ -61,12 +60,12 @@ case class BroadcastHashJoin( @transient private val broadcastFuture = future { // Note that we use .execute().collect() because we don't want to convert data to Scala types - val input: Array[Row] = buildPlan.execute().map(_.copy()).collect() + val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length) sparkContext.broadcast(hashed) }(BroadcastHashJoin.broadcastHashJoinExecutionContext) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val broadcastRelation = Await.result(broadcastFuture, timeout) streamedPlan.execute().mapPartitions { streamedIter => http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index a32e5fc..044964f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} /** @@ -38,10 +38,10 @@ case class BroadcastLeftSemiJoinHash( override def output: Seq[Attribute] = left.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val buildIter = buildPlan.execute().map(_.copy()).collect().toIterator - val hashSet = new java.util.HashSet[Row]() - var currentRow: Row = null + val hashSet = new java.util.HashSet[InternalRow]() + var currentRow: InternalRow = null // Create a Hash set of buildKeys while (buildIter.hasNext) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index caad3df..0b2cf8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -61,13 +61,14 @@ case class BroadcastNestedLoopJoin( @transient private lazy val boundCondition = newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val broadcastedRelation = - sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) + sparkContext.broadcast(broadcast.execute().map(_.copy()) + .collect().toIndexedSeq) /** All rows that either match both-way, or rows from streamed joined with nulls. */ val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter => - val matchedRows = new CompactBuffer[Row] + val matchedRows = new CompactBuffer[InternalRow] // TODO: Use Spark's BitSet. val includedBroadcastTuples = new scala.collection.mutable.BitSet(broadcastedRelation.value.size) @@ -118,8 +119,8 @@ case class BroadcastNestedLoopJoin( val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) /** Rows from broadcasted joined with nulls. */ - val broadcastRowsWithNulls: Seq[Row] = { - val buf: CompactBuffer[Row] = new CompactBuffer() + val broadcastRowsWithNulls: Seq[InternalRow] = { + val buf: CompactBuffer[InternalRow] = new CompactBuffer() var i = 0 val rel = broadcastedRelation.value while (i < rel.length) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index 191c00c..261b472 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 851de16..3a4196a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -49,11 +49,13 @@ trait HashJoin { @transient protected lazy val streamSideKeyGenerator: () => MutableProjection = newMutableProjection(streamedKeys, streamedPlan.output) - protected def hashJoin(streamIter: Iterator[Row], hashedRelation: HashedRelation): Iterator[Row] = + protected def hashJoin( + streamIter: Iterator[InternalRow], + hashedRelation: HashedRelation): Iterator[InternalRow] = { - new Iterator[Row] { - private[this] var currentStreamedRow: Row = _ - private[this] var currentHashMatches: CompactBuffer[Row] = _ + new Iterator[InternalRow] { + private[this] var currentStreamedRow: InternalRow = _ + private[this] var currentHashMatches: CompactBuffer[InternalRow] = _ private[this] var currentMatchPosition: Int = -1 // Mutable per row objects. @@ -65,7 +67,7 @@ trait HashJoin { (currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) || (streamIter.hasNext && fetchNext()) - override final def next(): Row = { + override final def next(): InternalRow = { val ret = buildSide match { case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition)) case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index c21a453..19aef99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -68,26 +68,29 @@ case class HashOuterJoin( } } - @transient private[this] lazy val DUMMY_LIST = Seq[Row](null) - @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row] + @transient private[this] lazy val DUMMY_LIST = Seq[InternalRow](null) + @transient private[this] lazy val EMPTY_LIST = Seq.empty[InternalRow] @transient private[this] lazy val leftNullRow = new GenericRow(left.output.length) @transient private[this] lazy val rightNullRow = new GenericRow(right.output.length) @transient private[this] lazy val boundCondition = - condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true) + condition.map( + newPredicate(_, left.output ++ right.output)).getOrElse((row: InternalRow) => true) // TODO we need to rewrite all of the iterators with our own implementation instead of the Scala // iterator for performance purpose. private[this] def leftOuterIterator( - key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = { - val ret: Iterable[Row] = { + key: InternalRow, + joinedRow: JoinedRow, + rightIter: Iterable[InternalRow]): Iterator[InternalRow] = { + val ret: Iterable[InternalRow] = { if (!key.anyNull) { val temp = rightIter.collect { case r if boundCondition(joinedRow.withRight(r)) => joinedRow.copy() } if (temp.size == 0) { - joinedRow.withRight(rightNullRow).copy :: Nil + joinedRow.withRight(rightNullRow).copy.asInstanceOf[InternalRow] :: Nil } else { temp } @@ -99,12 +102,15 @@ case class HashOuterJoin( } private[this] def rightOuterIterator( - key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = { + key: InternalRow, + leftIter: Iterable[InternalRow], + joinedRow: JoinedRow): Iterator[InternalRow] = { - val ret: Iterable[Row] = { + val ret: Iterable[InternalRow] = { if (!key.anyNull) { val temp = leftIter.collect { - case l if boundCondition(joinedRow.withLeft(l)) => joinedRow.copy + case l if boundCondition(joinedRow.withLeft(l)) => + joinedRow.copy } if (temp.size == 0) { joinedRow.withLeft(leftNullRow).copy :: Nil @@ -119,14 +125,14 @@ case class HashOuterJoin( } private[this] def fullOuterIterator( - key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row], - joinedRow: JoinedRow): Iterator[Row] = { + key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow], + joinedRow: JoinedRow): Iterator[InternalRow] = { if (!key.anyNull) { // Store the positions of records in right, if one of its associated row satisfy // the join condition. val rightMatchedSet = scala.collection.mutable.Set[Int]() - leftIter.iterator.flatMap[Row] { l => + leftIter.iterator.flatMap[InternalRow] { l => joinedRow.withLeft(l) var matched = false rightIter.zipWithIndex.collect { @@ -157,24 +163,25 @@ case class HashOuterJoin( joinedRow(leftNullRow, r).copy() } } else { - leftIter.iterator.map[Row] { l => + leftIter.iterator.map[InternalRow] { l => joinedRow(l, rightNullRow).copy() - } ++ rightIter.iterator.map[Row] { r => + } ++ rightIter.iterator.map[InternalRow] { r => joinedRow(leftNullRow, r).copy() } } } private[this] def buildHashTable( - iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, CompactBuffer[Row]] = { - val hashTable = new JavaHashMap[Row, CompactBuffer[Row]]() + iter: Iterator[InternalRow], + keyGenerator: Projection): JavaHashMap[InternalRow, CompactBuffer[InternalRow]] = { + val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]() while (iter.hasNext) { val currentRow = iter.next() val rowKey = keyGenerator(currentRow) var existingMatchList = hashTable.get(rowKey) if (existingMatchList == null) { - existingMatchList = new CompactBuffer[Row]() + existingMatchList = new CompactBuffer[InternalRow]() hashTable.put(rowKey, existingMatchList) } @@ -184,7 +191,7 @@ case class HashOuterJoin( hashTable } - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val joinedRow = new JoinedRow() left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => // TODO this probably can be replaced by external sort (sort merged join?) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
