Asif Hussain Shahid created SPARK-13116:
-------------------------------------------
Summary: TungstenAggregate though it is supposedly capable of all
processing unsafe & safe rows, fails if the input is safe rows
Key: SPARK-13116
URL: https://issues.apache.org/jira/browse/SPARK-13116
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.6.0
Reporter: Asif Hussain Shahid
TungstenAggregate though it is supposedly capable of all processing unsafe &
safe rows, fails if the input is safe rows.
If the input to TungstenAggregateIterator is a SafeRow, while the target is an
UnsafeRow , the current code will try to set the fields in the UnsafeRow using
the update method in UnSafeRow.
This method is called via TunsgtenAggregateIterator on the
InterpretedMutableProjection. The target row in the
InterpretedMutableProjection is an UnsafeRow, while the current row is a
SafeRow.
In the InterpretedMutableProjection's apply method, it invokes
mutableRow(i) = exprArray(i).eval(input)
Now for UnsafeRow, the update method throws UnsupportedOperationException.
The proposed fix I did for our forked branch , on the class
InterpretedProjection is:
+ private var targetUnsafe = false
+ type UnsafeSetter = (UnsafeRow, Any ) => Unit
+ private var setters : Array[UnsafeSetter] = _
private[this] val exprArray = expressions.toArray
private[this] var mutableRow: MutableRow = new
GenericMutableRow(exprArray.length)
def currentValue: InternalRow = mutableRow
+
override def target(row: MutableRow): MutableProjection = {
mutableRow = row
+ targetUnsafe = row match {
+ case _:UnsafeRow =>{
+ if(setters == null) {
+ setters = Array.ofDim[UnsafeSetter](exprArray.length)
+ for(i <- 0 until exprArray.length) {
+ setters(i) = exprArray(i).dataType match {
+ case IntegerType => (target: UnsafeRow, value: Any ) =>
+ target.setInt(i,value.asInstanceOf[Int])
+ case LongType => (target: UnsafeRow, value: Any ) =>
+ target.setLong(i,value.asInstanceOf[Long])
+ case DoubleType => (target: UnsafeRow, value: Any ) =>
+ target.setDouble(i,value.asInstanceOf[Double])
+ case FloatType => (target: UnsafeRow, value: Any ) =>
+ target.setFloat(i,value.asInstanceOf[Float])
+
+ case NullType => (target: UnsafeRow, value: Any ) =>
+ target.setNullAt(i)
+
+ case BooleanType => (target: UnsafeRow, value: Any ) =>
+ target.setBoolean(i,value.asInstanceOf[Boolean])
+
+ case ByteType => (target: UnsafeRow, value: Any ) =>
+ target.setByte(i,value.asInstanceOf[Byte])
+ case ShortType => (target: UnsafeRow, value: Any ) =>
+ target.setShort(i,value.asInstanceOf[Short])
+
+ }
+ }
+ }
+ true
+ }
+ case _ => false
+ }
+
this
}
override def apply(input: InternalRow): InternalRow = {
var i = 0
while (i < exprArray.length) {
- mutableRow(i) = exprArray(i).eval(input)
+ if(targetUnsafe) {
+ setters(i)(mutableRow.asInstanceOf[UnsafeRow],
exprArray(i).eval(input))
+ }else {
+ mutableRow(i) = exprArray(i).eval(input)
+ }
i += 1
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]