http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 4e072a9..2988161 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -118,7 +118,7 @@ class TungstenAggregationIterator( private def createNewAggregationBuffer(): UnsafeRow = { val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes) val buffer: UnsafeRow = UnsafeProjection.create(bufferSchema.map(_.dataType)) - .apply(new GenericMutableRow(bufferSchema.length)) + .apply(new GenericInternalRow(bufferSchema.length)) // Initialize declarative aggregates' buffer values expressionAggInitialProjection.target(buffer)(EmptyRow) // Initialize imperative aggregates' buffer values @@ -127,7 +127,7 @@ class TungstenAggregationIterator( } // Creates a function used to generate output rows. - override protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = { + override protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = { val modes = aggregateExpressions.map(_.mode).distinct if (modes.nonEmpty && !modes.contains(Final) && !modes.contains(Complete)) { // Fast path for partial aggregation, UnsafeRowJoiner is usually faster than projection @@ -137,7 +137,7 @@ class TungstenAggregationIterator( val bufferSchema = StructType.fromAttributes(bufferAttributes) val unsafeRowJoiner = GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema) - (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => { + (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => { unsafeRowJoiner.join(currentGroupingKey, currentBuffer.asInstanceOf[UnsafeRow]) } } else { @@ -300,7 +300,7 @@ class TungstenAggregationIterator( private[this] val sortBasedAggregationBuffer: UnsafeRow = createNewAggregationBuffer() // The function used to process rows in a group - private[this] var sortBasedProcessRow: (MutableRow, InternalRow) => Unit = null + private[this] var sortBasedProcessRow: (InternalRow, InternalRow) => Unit = null // Processes rows in the current group. It will stop when it find a new group. private def processCurrentSortedGroup(): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index 586e145..67760f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow, _} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, _} import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} @@ -96,18 +96,18 @@ sealed trait BufferSetterGetterUtils { getters } - def createSetters(schema: StructType): Array[((MutableRow, Int, Any) => Unit)] = { + def createSetters(schema: StructType): Array[((InternalRow, Int, Any) => Unit)] = { val dataTypes = schema.fields.map(_.dataType) - val setters = new Array[(MutableRow, Int, Any) => Unit](dataTypes.length) + val setters = new Array[(InternalRow, Int, Any) => Unit](dataTypes.length) var i = 0 while (i < setters.length) { setters(i) = dataTypes(i) match { case NullType => - (row: MutableRow, ordinal: Int, value: Any) => row.setNullAt(ordinal) + (row: InternalRow, ordinal: Int, value: Any) => row.setNullAt(ordinal) case b: BooleanType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setBoolean(ordinal, value.asInstanceOf[Boolean]) } else { @@ -115,7 +115,7 @@ sealed trait BufferSetterGetterUtils { } case ByteType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setByte(ordinal, value.asInstanceOf[Byte]) } else { @@ -123,7 +123,7 @@ sealed trait BufferSetterGetterUtils { } case ShortType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setShort(ordinal, value.asInstanceOf[Short]) } else { @@ -131,7 +131,7 @@ sealed trait BufferSetterGetterUtils { } case IntegerType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setInt(ordinal, value.asInstanceOf[Int]) } else { @@ -139,7 +139,7 @@ sealed trait BufferSetterGetterUtils { } case LongType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setLong(ordinal, value.asInstanceOf[Long]) } else { @@ -147,7 +147,7 @@ sealed trait BufferSetterGetterUtils { } case FloatType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setFloat(ordinal, value.asInstanceOf[Float]) } else { @@ -155,7 +155,7 @@ sealed trait BufferSetterGetterUtils { } case DoubleType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setDouble(ordinal, value.asInstanceOf[Double]) } else { @@ -164,13 +164,13 @@ sealed trait BufferSetterGetterUtils { case dt: DecimalType => val precision = dt.precision - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => // To make it work with UnsafeRow, we cannot use setNullAt. // Please see the comment of UnsafeRow's setDecimal. row.setDecimal(ordinal, value.asInstanceOf[Decimal], precision) case DateType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setInt(ordinal, value.asInstanceOf[Int]) } else { @@ -178,7 +178,7 @@ sealed trait BufferSetterGetterUtils { } case TimestampType => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.setLong(ordinal, value.asInstanceOf[Long]) } else { @@ -186,7 +186,7 @@ sealed trait BufferSetterGetterUtils { } case other => - (row: MutableRow, ordinal: Int, value: Any) => + (row: InternalRow, ordinal: Int, value: Any) => if (value != null) { row.update(ordinal, value) } else { @@ -209,7 +209,7 @@ private[aggregate] class MutableAggregationBufferImpl( toCatalystConverters: Array[Any => Any], toScalaConverters: Array[Any => Any], bufferOffset: Int, - var underlyingBuffer: MutableRow) + var underlyingBuffer: InternalRow) extends MutableAggregationBuffer with BufferSetterGetterUtils { private[this] val offsets: Array[Int] = { @@ -413,13 +413,13 @@ case class ScalaUDAF( null) } - override def initialize(buffer: MutableRow): Unit = { + override def initialize(buffer: InternalRow): Unit = { mutableAggregateBuffer.underlyingBuffer = buffer udaf.initialize(mutableAggregateBuffer) } - override def update(buffer: MutableRow, input: InternalRow): Unit = { + override def update(buffer: InternalRow, input: InternalRow): Unit = { mutableAggregateBuffer.underlyingBuffer = buffer udaf.update( @@ -427,7 +427,7 @@ case class ScalaUDAF( inputToScalaConverters(inputProjection(input)).asInstanceOf[Row]) } - override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { + override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = { mutableAggregateBuffer.underlyingBuffer = buffer1 inputAggregateBuffer.underlyingInputBuffer = buffer2 http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala index 7cde04b..6241b79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -21,15 +21,16 @@ import java.nio.{ByteBuffer, ByteOrder} import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow} import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor import org.apache.spark.sql.types._ /** * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is * extracted from the buffer, instead of directly returning it, the value is set into some field of - * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods - * for primitive values provided by [[MutableRow]]. + * a [[InternalRow]]. In this way, boxing cost can be avoided by leveraging the setter methods + * for primitive values provided by [[InternalRow]]. */ private[columnar] trait ColumnAccessor { initialize() @@ -38,7 +39,7 @@ private[columnar] trait ColumnAccessor { def hasNext: Boolean - def extractTo(row: MutableRow, ordinal: Int): Unit + def extractTo(row: InternalRow, ordinal: Int): Unit protected def underlyingBuffer: ByteBuffer } @@ -52,11 +53,11 @@ private[columnar] abstract class BasicColumnAccessor[JvmType]( override def hasNext: Boolean = buffer.hasRemaining - override def extractTo(row: MutableRow, ordinal: Int): Unit = { + override def extractTo(row: InternalRow, ordinal: Int): Unit = { extractSingle(row, ordinal) } - def extractSingle(row: MutableRow, ordinal: Int): Unit = { + def extractSingle(row: InternalRow, ordinal: Int): Unit = { columnType.extract(buffer, row, ordinal) } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index d27d8c3..703bde2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -92,7 +92,7 @@ private[columnar] sealed abstract class ColumnType[JvmType] { * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever * possible. */ - def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { setField(row, ordinal, extract(buffer)) } @@ -125,13 +125,13 @@ private[columnar] sealed abstract class ColumnType[JvmType] { * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing * costs whenever possible. */ - def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit + def setField(row: InternalRow, ordinal: Int, value: JvmType): Unit /** * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid * boxing/unboxing costs whenever possible. */ - def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int): Unit = { setField(to, toOrdinal, getField(from, fromOrdinal)) } @@ -149,7 +149,7 @@ private[columnar] object NULL extends ColumnType[Any] { override def defaultSize: Int = 0 override def append(v: Any, buffer: ByteBuffer): Unit = {} override def extract(buffer: ByteBuffer): Any = null - override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) + override def setField(row: InternalRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) override def getField(row: InternalRow, ordinal: Int): Any = null } @@ -177,18 +177,18 @@ private[columnar] object INT extends NativeColumnType(IntegerType, 4) { ByteBufferHelper.getInt(buffer) } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setInt(ordinal, ByteBufferHelper.getInt(buffer)) } - override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Int): Unit = { row.setInt(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setInt(toOrdinal, from.getInt(fromOrdinal)) } } @@ -206,17 +206,17 @@ private[columnar] object LONG extends NativeColumnType(LongType, 8) { ByteBufferHelper.getLong(buffer) } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) } - override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Long): Unit = { row.setLong(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setLong(toOrdinal, from.getLong(fromOrdinal)) } } @@ -234,17 +234,17 @@ private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) { ByteBufferHelper.getFloat(buffer) } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer)) } - override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Float): Unit = { row.setFloat(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) } } @@ -262,17 +262,17 @@ private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) { ByteBufferHelper.getDouble(buffer) } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer)) } - override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Double): Unit = { row.setDouble(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) } } @@ -288,17 +288,17 @@ private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) { override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1 - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setBoolean(ordinal, buffer.get() == 1) } - override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Boolean): Unit = { row.setBoolean(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) } } @@ -316,17 +316,17 @@ private[columnar] object BYTE extends NativeColumnType(ByteType, 1) { buffer.get() } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setByte(ordinal, buffer.get()) } - override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Byte): Unit = { row.setByte(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setByte(toOrdinal, from.getByte(fromOrdinal)) } } @@ -344,17 +344,17 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) { buffer.getShort() } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { row.setShort(ordinal, buffer.getShort()) } - override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Short): Unit = { row.setShort(ordinal, value) } override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal) - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { to.setShort(toOrdinal, from.getShort(fromOrdinal)) } } @@ -366,7 +366,7 @@ private[columnar] object SHORT extends NativeColumnType(ShortType, 2) { private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] { // copy the bytes from ByteBuffer to UnsafeRow - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { if (row.isInstanceOf[MutableUnsafeRow]) { val numBytes = buffer.getInt val cursor = buffer.position() @@ -407,7 +407,7 @@ private[columnar] object STRING UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length) } - override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: UTF8String): Unit = { if (row.isInstanceOf[MutableUnsafeRow]) { row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value) } else { @@ -419,7 +419,7 @@ private[columnar] object STRING row.getUTF8String(ordinal) } - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { setField(to, toOrdinal, getField(from, fromOrdinal)) } @@ -433,7 +433,7 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int) Decimal(ByteBufferHelper.getLong(buffer), precision, scale) } - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + override def extract(buffer: ByteBuffer, row: InternalRow, ordinal: Int): Unit = { if (row.isInstanceOf[MutableUnsafeRow]) { // copy it as Long row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) @@ -459,11 +459,11 @@ private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int) row.getDecimal(ordinal, precision, scale) } - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = { row.setDecimal(ordinal, value, precision) } - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + override def copyField(from: InternalRow, fromOrdinal: Int, to: InternalRow, toOrdinal: Int) { setField(to, toOrdinal, getField(from, fromOrdinal)) } } @@ -497,7 +497,7 @@ private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) { def dataType: DataType = BinaryType - override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Array[Byte]): Unit = { row.update(ordinal, value) } @@ -522,7 +522,7 @@ private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int) row.getDecimal(ordinal, precision, scale) } - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: Decimal): Unit = { row.setDecimal(ordinal, value, precision) } @@ -553,7 +553,7 @@ private[columnar] case class STRUCT(dataType: StructType) override def defaultSize: Int = 20 - override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: UnsafeRow): Unit = { row.update(ordinal, value) } @@ -591,7 +591,7 @@ private[columnar] case class ARRAY(dataType: ArrayType) override def defaultSize: Int = 28 - override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: UnsafeArrayData): Unit = { row.update(ordinal, value) } @@ -630,7 +630,7 @@ private[columnar] case class MAP(dataType: MapType) override def defaultSize: Int = 68 - override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = { + override def setField(row: InternalRow, ordinal: Int, value: UnsafeMapData): Unit = { row.update(ordinal, value) } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 96bd338..14024d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -36,8 +36,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] { * * WARNING: These setter MUST be called in increasing order of ordinals. */ -class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) { - +class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalRow { override def isNullAt(i: Int): Boolean = writer.isNullAt(i) override def setNullAt(i: Int): Unit = writer.setNullAt(i) @@ -55,6 +54,9 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(nu override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException // all other methods inherited from GenericMutableRow are not need + override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException + override def numFields: Int = throw new UnsupportedOperationException + override def copy(): InternalRow = throw new UnsupportedOperationException } /** http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala index 2465633..2f09757 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.{ByteBuffer, ByteOrder} -import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.catalyst.InternalRow private[columnar] trait NullableColumnAccessor extends ColumnAccessor { private var nullsBuffer: ByteBuffer = _ @@ -39,7 +39,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor { super.initialize() } - abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = { + abstract override def extractTo(row: InternalRow, ordinal: Int): Unit = { if (pos == nextNullIndex) { seenNulls += 1 http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala index 6579b50..e1d13ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.columnar.compression -import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor} import org.apache.spark.sql.types.AtomicType @@ -33,7 +33,7 @@ private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends Colu abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext - override def extractSingle(row: MutableRow, ordinal: Int): Unit = { + override def extractSingle(row: InternalRow, ordinal: Int): Unit = { decoder.next(row, ordinal) } } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala index b90d00b..6e4f1c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.MutableRow import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType} import org.apache.spark.sql.types.AtomicType @@ -39,7 +38,7 @@ private[columnar] trait Encoder[T <: AtomicType] { } private[columnar] trait Decoder[T <: AtomicType] { - def next(row: MutableRow, ordinal: Int): Unit + def next(row: InternalRow, ordinal: Int): Unit def hasNext: Boolean } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index 941f03b..ee99c90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.types._ @@ -56,7 +56,7 @@ private[columnar] case object PassThrough extends CompressionScheme { class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) extends compression.Decoder[T] { - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { columnType.extract(buffer, row, ordinal) } @@ -86,7 +86,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { private var _compressedSize = 0 // Using `MutableRow` to store the last value to avoid boxing/unboxing cost. - private val lastValue = new SpecificMutableRow(Seq(columnType.dataType)) + private val lastValue = new SpecificInternalRow(Seq(columnType.dataType)) private var lastRun = 0 override def uncompressedSize: Int = _uncompressedSize @@ -117,9 +117,9 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { to.putInt(RunLengthEncoding.typeId) if (from.hasRemaining) { - val currentValue = new SpecificMutableRow(Seq(columnType.dataType)) + val currentValue = new SpecificInternalRow(Seq(columnType.dataType)) var currentRun = 1 - val value = new SpecificMutableRow(Seq(columnType.dataType)) + val value = new SpecificInternalRow(Seq(columnType.dataType)) columnType.extract(from, currentValue, 0) @@ -156,7 +156,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { private var valueCount = 0 private var currentValue: T#InternalType = _ - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { if (valueCount == run) { currentValue = columnType.extract(buffer) run = ByteBufferHelper.getInt(buffer) @@ -273,7 +273,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) } - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType]) } @@ -356,7 +356,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme { private var visited: Int = 0 - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { val bit = visited % BITS_PER_LONG visited += 1 @@ -443,7 +443,7 @@ private[columnar] case object IntDelta extends CompressionScheme { override def hasNext: Boolean = buffer.hasRemaining - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { val delta = buffer.get() prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) row.setInt(ordinal, prev) @@ -523,7 +523,7 @@ private[columnar] case object LongDelta extends CompressionScheme { override def hasNext: Boolean = buffer.hasRemaining - override def next(row: MutableRow, ordinal: Int): Unit = { + override def next(row: InternalRow, ordinal: Int): Unit = { val delta = buffer.get() prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) row.setLong(ordinal, prev) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 693b4c4..6f9ed50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -273,7 +273,7 @@ object DataSourceStrategy extends Strategy with Logging { // Get the bucket ID based on the bucketing values. // Restriction: Bucket pruning works iff the bucketing column has one and only one column. def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { - val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType)) + val mutableRow = new SpecificInternalRow(Seq(bucketColumn.dataType)) mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null) val bucketIdGeneration = UnsafeProjection.create( HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil, http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 33b170b..55cb26d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer} import org.apache.spark.sql.types._ @@ -88,7 +88,7 @@ object CSVRelation extends Logging { case (field, index) => safeRequiredIndices(safeRequiredFields.indexOf(field)) = index } val requiredSize = requiredFields.length - val row = new GenericMutableRow(requiredSize) + val row = new GenericInternalRow(requiredSize) (tokens: Array[String], numMalformedRows) => { if (params.dropMalformed && schemaFields.length != tokens.length) { http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 66f2bad..4754963 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} import org.apache.spark.sql.types._ @@ -283,7 +283,7 @@ object JdbcUtils extends Logging { new NextIterator[InternalRow] { private[this] val rs = resultSet private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema) - private[this] val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType)) + private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType)) override protected def close(): Unit = { try { @@ -314,22 +314,22 @@ object JdbcUtils extends Logging { // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field // for `MutableRow`. The last argument `Int` means the index for the value to be set in // the row and also used for the value in `ResultSet`. - private type JDBCValueGetter = (ResultSet, MutableRow, Int) => Unit + private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit /** * Creates `JDBCValueGetter`s according to [[StructType]], which can set - * each value from `ResultSet` to each field of [[MutableRow]] correctly. + * each value from `ResultSet` to each field of [[InternalRow]] correctly. */ private def makeGetters(schema: StructType): Array[JDBCValueGetter] = schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata)) private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match { case BooleanType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setBoolean(pos, rs.getBoolean(pos + 1)) case DateType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. val dateVal = rs.getDate(pos + 1) if (dateVal != null) { @@ -347,25 +347,25 @@ object JdbcUtils extends Logging { // retrieve it, you will get wrong result 199.99. // So it is needed to set precision and scale for Decimal based on JDBC metadata. case DecimalType.Fixed(p, s) => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => val decimal = nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s)) row.update(pos, decimal) case DoubleType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setDouble(pos, rs.getDouble(pos + 1)) case FloatType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setFloat(pos, rs.getFloat(pos + 1)) case IntegerType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setInt(pos, rs.getInt(pos + 1)) case LongType if metadata.contains("binarylong") => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => val bytes = rs.getBytes(pos + 1) var ans = 0L var j = 0 @@ -376,20 +376,20 @@ object JdbcUtils extends Logging { row.setLong(pos, ans) case LongType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setLong(pos, rs.getLong(pos + 1)) case ShortType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.setShort(pos, rs.getShort(pos + 1)) case StringType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 row.update(pos, UTF8String.fromString(rs.getString(pos + 1))) case TimestampType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t)) @@ -398,7 +398,7 @@ object JdbcUtils extends Logging { } case BinaryType => - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) case ArrayType(et, _) => @@ -437,7 +437,7 @@ object JdbcUtils extends Logging { case _ => (array: Object) => array.asInstanceOf[Array[Any]] } - (rs: ResultSet, row: MutableRow, pos: Int) => + (rs: ResultSet, row: InternalRow, pos: Int) => val array = nullSafeConvert[Object]( rs.getArray(pos + 1).getArray, array => new GenericArrayData(elementConversion.apply(array))) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 9ffc2b5..33dcf2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -40,7 +40,7 @@ import org.apache.spark.unsafe.types.UTF8String /** * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some * corresponding parent container. For example, a converter for a `StructType` field may set - * converted values to a [[MutableRow]]; or a converter for array elements may append converted + * converted values to a [[InternalRow]]; or a converter for array elements may append converted * values to an [[ArrayBuffer]]. */ private[parquet] trait ParentContainerUpdater { @@ -155,7 +155,7 @@ private[parquet] class ParquetRowConverter( * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates * converted filed values to the `ordinal`-th cell in `currentRow`. */ - private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + private final class RowUpdater(row: InternalRow, ordinal: Int) extends ParentContainerUpdater { override def set(value: Any): Unit = row(ordinal) = value override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) override def setByte(value: Byte): Unit = row.setByte(ordinal, value) @@ -166,7 +166,7 @@ private[parquet] class ParquetRowConverter( override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) } - private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) + private val currentRow = new SpecificInternalRow(catalystType.map(_.dataType)) private val unsafeProjection = UnsafeProjection.create(catalystType) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index 43cdce7..bfe7e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -119,7 +119,7 @@ case class BroadcastNestedLoopJoinExec( streamed.execute().mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow - val nulls = new GenericMutableRow(broadcast.output.size) + val nulls = new GenericInternalRow(broadcast.output.size) // Returns an iterator to avoid copy the rows. new Iterator[InternalRow] { @@ -205,14 +205,14 @@ case class BroadcastNestedLoopJoinExec( val joinedRow = new JoinedRow if (condition.isDefined) { - val resultRow = new GenericMutableRow(Array[Any](null)) + val resultRow = new GenericInternalRow(Array[Any](null)) streamedIter.map { row => val result = buildRows.exists(r => boundCondition(joinedRow(row, r))) resultRow.setBoolean(0, result) joinedRow(row, resultRow) } } else { - val resultRow = new GenericMutableRow(Array[Any](buildRows.nonEmpty)) + val resultRow = new GenericInternalRow(Array[Any](buildRows.nonEmpty)) streamedIter.map { row => joinedRow(row, resultRow) } @@ -293,7 +293,7 @@ case class BroadcastNestedLoopJoinExec( } val notMatchedBroadcastRows: Seq[InternalRow] = { - val nulls = new GenericMutableRow(streamed.output.size) + val nulls = new GenericInternalRow(streamed.output.size) val buf: CompactBuffer[InternalRow] = new CompactBuffer() val joinedRow = new JoinedRow joinedRow.withLeft(nulls) @@ -311,7 +311,7 @@ case class BroadcastNestedLoopJoinExec( val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter => val buildRows = relation.value val joinedRow = new JoinedRow - val nulls = new GenericMutableRow(broadcast.output.size) + val nulls = new GenericInternalRow(broadcast.output.size) streamedIter.flatMap { streamedRow => var i = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index fb6bfa7..8ddac19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -192,7 +192,7 @@ trait HashJoin { streamIter: Iterator[InternalRow], hashedRelation: HashedRelation): Iterator[InternalRow] = { val joinKeys = streamSideKeyGenerator() - val result = new GenericMutableRow(Array[Any](null)) + val result = new GenericInternalRow(Array[Any](null)) val joinedRow = new JoinedRow streamIter.map { current => val key = joinKeys(current) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 81b3e1d..ecf7cf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -275,7 +275,7 @@ case class SortMergeJoinExec( case j: ExistenceJoin => new RowIterator { private[this] var currentLeftRow: InternalRow = _ - private[this] val result: MutableRow = new GenericMutableRow(Array[Any](null)) + private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null)) private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index c7e2671..2acc511 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -141,7 +141,7 @@ object ObjectOperator { def serializeObjectToRow(serializer: Seq[Expression]): Any => UnsafeRow = { val proj = GenerateUnsafeProjection.generate(serializer) val objType = serializer.head.collect { case b: BoundReference => b.dataType }.head - val objRow = new SpecificMutableRow(objType :: Nil) + val objRow = new SpecificInternalRow(objType :: Nil) (o: Any) => { objRow(0) = o proj(objRow) @@ -149,7 +149,7 @@ object ObjectOperator { } def wrapObjectToRow(objType: DataType): Any => InternalRow = { - val outputRow = new SpecificMutableRow(objType :: Nil) + val outputRow = new SpecificInternalRow(objType :: Nil) (o: Any) => { outputRow(0) = o outputRow http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index f9d20ad..dcaf2c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -147,7 +147,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler - val mutableRow = new GenericMutableRow(1) + val mutableRow = new GenericInternalRow(1) val joined = new JoinedRow val resultType = if (udfs.length == 1) { udfs.head.dataType http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 822f49e..c02b154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.stat import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.util.QuantileSummaries import org.apache.spark.sql.functions._ @@ -186,7 +186,7 @@ object StatFunctions extends Logging { require(columnSize < 1e4, s"The number of distinct values for $col2, can't " + s"exceed 1e4. Currently $columnSize") val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) => - val countsRow = new GenericMutableRow(columnSize + 1) + val countsRow = new GenericInternalRow(columnSize + 1) rows.foreach { (row: Row) => // row.get(0) is column 1 // row.get(1) is column 2 http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala index d3a46d0..c9f5d3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala @@ -123,7 +123,7 @@ private[window] final class AggregateProcessor( private[this] val join = new JoinedRow private[this] val numImperatives = imperatives.length - private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType)) + private[this] val buffer = new SpecificInternalRow(bufferSchema.toSeq.map(_.dataType)) initialProjection.target(buffer) updateProjection.target(buffer) @@ -154,6 +154,6 @@ private[window] final class AggregateProcessor( } /** Evaluate buffer. */ - def evaluate(target: MutableRow): Unit = + def evaluate(target: InternalRow): Unit = evaluateProjection.target(target)(buffer) } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 7a6a30f..1dd281e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -204,7 +204,7 @@ case class WindowExec( val factory = key match { // Offset Frame case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h => - target: MutableRow => + target: InternalRow => new OffsetWindowFunctionFrame( target, ordinal, @@ -217,7 +217,7 @@ case class WindowExec( // Growing Frame. case ("AGGREGATE", frameType, None, Some(high)) => - target: MutableRow => { + target: InternalRow => { new UnboundedPrecedingWindowFunctionFrame( target, processor, @@ -226,7 +226,7 @@ case class WindowExec( // Shrinking Frame. case ("AGGREGATE", frameType, Some(low), None) => - target: MutableRow => { + target: InternalRow => { new UnboundedFollowingWindowFunctionFrame( target, processor, @@ -235,7 +235,7 @@ case class WindowExec( // Moving Frame. case ("AGGREGATE", frameType, Some(low), Some(high)) => - target: MutableRow => { + target: InternalRow => { new SlidingWindowFunctionFrame( target, processor, @@ -245,7 +245,7 @@ case class WindowExec( // Entire Partition Frame. case ("AGGREGATE", frameType, None, None) => - target: MutableRow => { + target: InternalRow => { new UnboundedWindowFunctionFrame(target, processor) } } @@ -312,7 +312,7 @@ case class WindowExec( val inputFields = child.output.length var sorter: UnsafeExternalSorter = null var rowBuffer: RowBuffer = null - val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType)) + val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType)) val frames = factories.map(_(windowFunctionResult)) val numFrames = frames.length private[this] def fetchNextPartition() { http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index 2ab9faa..70efc0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -56,7 +56,7 @@ private[window] abstract class WindowFunctionFrame { * @param offset by which rows get moved within a partition. */ private[window] final class OffsetWindowFunctionFrame( - target: MutableRow, + target: InternalRow, ordinal: Int, expressions: Array[OffsetWindowFunction], inputSchema: Seq[Attribute], @@ -136,7 +136,7 @@ private[window] final class OffsetWindowFunctionFrame( * @param ubound comparator used to identify the upper bound of an output row. */ private[window] final class SlidingWindowFunctionFrame( - target: MutableRow, + target: InternalRow, processor: AggregateProcessor, lbound: BoundOrdering, ubound: BoundOrdering) @@ -217,7 +217,7 @@ private[window] final class SlidingWindowFunctionFrame( * @param processor to calculate the row values with. */ private[window] final class UnboundedWindowFunctionFrame( - target: MutableRow, + target: InternalRow, processor: AggregateProcessor) extends WindowFunctionFrame { @@ -255,7 +255,7 @@ private[window] final class UnboundedWindowFunctionFrame( * @param ubound comparator used to identify the upper bound of an output row. */ private[window] final class UnboundedPrecedingWindowFunctionFrame( - target: MutableRow, + target: InternalRow, processor: AggregateProcessor, ubound: BoundOrdering) extends WindowFunctionFrame { @@ -317,7 +317,7 @@ private[window] final class UnboundedPrecedingWindowFunctionFrame( * @param lbound comparator used to identify the lower bound of an output row. */ private[window] final class UnboundedFollowingWindowFunctionFrame( - target: MutableRow, + target: InternalRow, processor: AggregateProcessor, lbound: BoundOrdering) extends WindowFunctionFrame { http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala index 34936b3..7516be3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, SpecificInternalRow} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -27,7 +27,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext { import testImplicits._ test("create row") { - val expected = new GenericMutableRow(4) + val expected = new GenericInternalRow(4) expected.setInt(0, 2147483647) expected.update(1, UTF8String.fromString("this is a string")) expected.setBoolean(2, false) @@ -49,7 +49,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext { } test("SpecificMutableRow.update with null") { - val row = new SpecificMutableRow(Seq(IntegerType)) + val row = new SpecificInternalRow(Seq(IntegerType)) row(0) = null assert(row.isNullAt(0)) } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala index b5eb16b..ffa26f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import org.apache.spark.sql.TypedImperativeAggregateSuite.TypedMax import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericMutableRow, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, SpecificInternalRow} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate import org.apache.spark.sql.execution.aggregate.SortAggregateExec import org.apache.spark.sql.expressions.Window @@ -64,7 +64,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext { assert(agg.eval(mergeBuffer) == data.map(_._1).max) // Tests low level eval(row: InternalRow) API. - val row = new GenericMutableRow(Array(mergeBuffer): Array[Any]) + val row = new GenericInternalRow(Array(mergeBuffer): Array[Any]) // Evaluates directly on row consist of aggregation buffer object. assert(agg.eval(row) == data.map(_._1).max) @@ -73,7 +73,7 @@ class TypedImperativeAggregateSuite extends QueryTest with SharedSQLContext { test("supports SpecificMutableRow as mutable row") { val aggregationBufferSchema = Seq(IntegerType, LongType, BinaryType, IntegerType) val aggBufferOffset = 2 - val buffer = new SpecificMutableRow(aggregationBufferSchema) + val buffer = new SpecificInternalRow(aggregationBufferSchema) val agg = new TypedMax(BoundReference(ordinal = 1, dataType = IntegerType, nullable = false)) .withNewMutableAggBufferOffset(aggBufferOffset) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala index 805b566..8bf9f52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types._ @@ -54,7 +54,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging { expected: Int): Unit = { assertResult(expected, s"Wrong actualSize for $columnType") { - val row = new GenericMutableRow(1) + val row = new GenericInternalRow(1) row.update(0, CatalystTypeConverters.convertToCatalyst(value)) val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) columnType.actualSize(proj(row), 0) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala index 1529313..686c8fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala @@ -21,14 +21,14 @@ import scala.collection.immutable.HashSet import scala.util.Random import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} import org.apache.spark.sql.types.{AtomicType, Decimal} import org.apache.spark.unsafe.types.UTF8String object ColumnarTestUtils { - def makeNullRow(length: Int): GenericMutableRow = { - val row = new GenericMutableRow(length) + def makeNullRow(length: Int): GenericInternalRow = { + val row = new GenericInternalRow(length) (0 until length).foreach(row.setNullAt) row } @@ -86,7 +86,7 @@ object ColumnarTestUtils { tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail) def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = { - val row = new GenericMutableRow(columnTypes.length) + val row = new GenericInternalRow(columnTypes.length) makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => row(index) = value } @@ -95,11 +95,11 @@ object ColumnarTestUtils { def makeUniqueValuesAndSingleValueRows[T <: AtomicType]( columnType: NativeColumnType[T], - count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = { + count: Int): (Seq[T#InternalType], Seq[GenericInternalRow]) = { val values = makeUniqueRandomValues(columnType, count) val rows = values.map { value => - val row = new GenericMutableRow(1) + val row = new GenericInternalRow(1) row(0) = value row } http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala index dc22d3e..8f4ca3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} import org.apache.spark.sql.types._ class TestNullableColumnAccessor[JvmType]( @@ -72,7 +72,7 @@ class NullableColumnAccessorSuite extends SparkFunSuite { } val accessor = TestNullableColumnAccessor(builder.build(), columnType) - val row = new GenericMutableRow(1) + val row = new GenericInternalRow(1) val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) (0 until 4).foreach { _ => http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala index cdd4551..b2b6e92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} import org.apache.spark.sql.types._ class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType]) @@ -94,7 +94,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite { (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt())) // For non-null values - val actual = new GenericMutableRow(new Array[Any](1)) + val actual = new GenericInternalRow(new Array[Any](1)) (0 until 4).foreach { _ => columnType.extract(buffer, actual, 0) assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)), http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala index f67e9c7..d01bf91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar.compression import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats} import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ @@ -72,7 +72,7 @@ class BooleanBitSetSuite extends SparkFunSuite { buffer.rewind().position(headerSize + 4) val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) - val mutableRow = new GenericMutableRow(1) + val mutableRow = new GenericInternalRow(1) if (values.nonEmpty) { values.foreach { assert(decoder.hasNext) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index babf944..9005ec9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING} import org.apache.spark.sql.types.AtomicType import org.apache.spark.util.Benchmark @@ -111,7 +111,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes { input.rewind() benchmark.addCase(label)({ i: Int => - val rowBuf = new GenericMutableRow(1) + val rowBuf = new GenericInternalRow(1) for (n <- 0L until iters) { compressedBuf.rewind.position(4) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala index 830ca02..67139b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.ByteBuffer import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType @@ -97,7 +97,7 @@ class DictionaryEncodingSuite extends SparkFunSuite { buffer.rewind().position(headerSize + 4) val decoder = DictionaryEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) + val mutableRow = new GenericInternalRow(1) if (inputSeq.nonEmpty) { inputSeq.foreach { i => http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index a530e27..411d31f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.IntegralType @@ -48,7 +48,7 @@ class IntegralDeltaSuite extends SparkFunSuite { } input.foreach { value => - val row = new GenericMutableRow(1) + val row = new GenericInternalRow(1) columnType.setField(row, 0, value) builder.appendFrom(row, 0) } @@ -95,7 +95,7 @@ class IntegralDeltaSuite extends SparkFunSuite { buffer.rewind().position(headerSize + 4) val decoder = scheme.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) + val mutableRow = new GenericInternalRow(1) if (input.nonEmpty) { input.foreach{ http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala index 95642e9..dffa9b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ import org.apache.spark.sql.types.AtomicType @@ -80,7 +80,7 @@ class RunLengthEncodingSuite extends SparkFunSuite { buffer.rewind().position(headerSize + 4) val decoder = RunLengthEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) + val mutableRow = new GenericInternalRow(1) if (inputSeq.nonEmpty) { inputSeq.foreach { i => http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 3161a63..580eade 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -38,7 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} -import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -716,7 +716,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) val vectorizedReader = new VectorizedParquetRecordReader - val partitionValues = new GenericMutableRow(Array(v)) + val partitionValues = new GenericInternalRow(Array(v)) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) try { http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 9dd8d9f..4c4a7d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf @@ -719,7 +719,7 @@ object TestingUDT { .add("c", DoubleType, nullable = false) override def serialize(n: NestedStruct): Any = { - val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType)) + val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType)) row.setInt(0, n.a) row.setLong(1, n.b) row.setDouble(2, n.c) http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index fe34caa..1625116 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -688,25 +688,25 @@ private[hive] trait HiveInspectors { * @return A function that performs in-place updating of a MutableRow. * Use the overloaded ObjectInspector version for assignments. */ - def unwrapperFor(field: HiveStructField): (Any, MutableRow, Int) => Unit = + def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit = field.getFieldObjectInspector match { case oi: BooleanObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value)) case oi: ByteObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value)) case oi: ShortObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value)) case oi: IntObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value)) case oi: LongObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value)) case oi: FloatObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value)) case oi: DoubleObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) + (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) case oi => val unwrapper = unwrapperFor(oi) - (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapper(value) + (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value) } def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef = { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
