[
https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Asif Hussain Shahid updated SPARK-13116:
----------------------------------------
Attachment: SPARK_13116_Test.scala
A test which reproduces the issue.
Pls note that to reproduce the issue you will have to tweak the SparkPlan code
a little as explained in the ticket.
> TungstenAggregate though it is supposedly capable of all processing unsafe &
> safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13116
> URL: https://issues.apache.org/jira/browse/SPARK-13116
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Asif Hussain Shahid
> Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe &
> safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is
> an UnsafeRow , the current code will try to set the fields in the UnsafeRow
> using the update method in UnSafeRow.
> This method is called via TunsgtenAggregateIterator on the
> InterpretedMutableProjection. The target row in the
> InterpretedMutableProjection is an UnsafeRow, while the current row is a
> SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
> mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class
> InterpretedProjection is:
> + private var targetUnsafe = false
> + type UnsafeSetter = (UnsafeRow, Any ) => Unit
> + private var setters : Array[UnsafeSetter] = _
> private[this] val exprArray = expressions.toArray
> private[this] var mutableRow: MutableRow = new
> GenericMutableRow(exprArray.length)
> def currentValue: InternalRow = mutableRow
>
> +
> override def target(row: MutableRow): MutableProjection = {
> mutableRow = row
> + targetUnsafe = row match {
> + case _:UnsafeRow =>{
> + if(setters == null) {
> + setters = Array.ofDim[UnsafeSetter](exprArray.length)
> + for(i <- 0 until exprArray.length) {
> + setters(i) = exprArray(i).dataType match {
> + case IntegerType => (target: UnsafeRow, value: Any ) =>
> + target.setInt(i,value.asInstanceOf[Int])
> + case LongType => (target: UnsafeRow, value: Any ) =>
> + target.setLong(i,value.asInstanceOf[Long])
> + case DoubleType => (target: UnsafeRow, value: Any ) =>
> + target.setDouble(i,value.asInstanceOf[Double])
> + case FloatType => (target: UnsafeRow, value: Any ) =>
> + target.setFloat(i,value.asInstanceOf[Float])
> +
> + case NullType => (target: UnsafeRow, value: Any ) =>
> + target.setNullAt(i)
> +
> + case BooleanType => (target: UnsafeRow, value: Any ) =>
> + target.setBoolean(i,value.asInstanceOf[Boolean])
> +
> + case ByteType => (target: UnsafeRow, value: Any ) =>
> + target.setByte(i,value.asInstanceOf[Byte])
> + case ShortType => (target: UnsafeRow, value: Any ) =>
> + target.setShort(i,value.asInstanceOf[Short])
> +
> + }
> + }
> + }
> + true
> + }
> + case _ => false
> + }
> +
> this
> }
>
> override def apply(input: InternalRow): InternalRow = {
> var i = 0
> while (i < exprArray.length) {
> - mutableRow(i) = exprArray(i).eval(input)
> + if(targetUnsafe) {
> + setters(i)(mutableRow.asInstanceOf[UnsafeRow],
> exprArray(i).eval(input))
> + }else {
> + mutableRow(i) = exprArray(i).eval(input)
> + }
> i += 1
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]