[ 
https://issues.apache.org/jira/browse/SPARK-28246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923875#comment-16923875
 ] 

Jungtaek Lim commented on SPARK-28246:
--------------------------------------

The implicit rule is when initialize or equivalent method is presented, it will 
be called and it should initialize everything to be a default value. JVM 
developers tend to expect default value is set for uninitialized thing, but 
that's the guarantee JVM is providing. Spark doesn't set default value to and 
let end users initialize it by themselves - it's natural because Spark doesn't 
know about "default value", unlike JVM's case.

I guess the explanation of flow for UDAG (the sequence of method calls in 
Spark) is missing in both the javadoc of UDAF class and sql-getting-started.md. 
So I feel that could be a documentation issue, but still not a bug.

> State of UDAF: buffer is not cleared
> ------------------------------------
>
>                 Key: SPARK-28246
>                 URL: https://issues.apache.org/jira/browse/SPARK-28246
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>         Environment: Ubuntu Linux 16.04
> Reproducible with option --master local[1]
> {code:java}
> $ spark-shell --master local[1]
> {code}
>            Reporter: Pavel Parkhomenko
>            Priority: Major
>
> Buffer object for UserDefinedAggregateFunction contains data from previous 
> iteration. For example,
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
> UserDefinedAggregateFunction}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions.callUDF
> import java.util.Arrays.asList
> val df = spark.createDataFrame(
>   asList(
>     Row(1, "a"),
>     Row(2, "b")),
>   StructType(List(
>     StructField("id", IntegerType),
>     StructField("value", StringType))))
> trait Min extends UserDefinedAggregateFunction {
>   override val inputSchema: StructType = 
> StructType(Array(StructField("value", StringType)))
>   override val bufferSchema: StructType = StructType(Array(StructField("min", 
> StringType)))
>   override def dataType: DataType = StringType
>   override def deterministic: Boolean = true
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
>     if (input(0) != null && (buffer(0) == null || buffer.getString(0) > 
> input.getString(0))) buffer(0) = input(0)
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
> update(buffer1, buffer2)
>   override def evaluate(buffer: Row): Any = buffer(0)
> }
> class GoodMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) 
> = None
> }
> class BadMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {}
> }
> spark.udf.register("goodmin", new GoodMin)
> spark.udf.register("badmin", new BadMin)
> df groupBy "id" agg callUDF("goodmin", $"value") show false
> df groupBy "id" agg callUDF("badmin", $"value") show false
> {code}
> Output is
> {noformat}
> scala> df groupBy "id" agg callUDF("goodmin", $"value") show false
> +---+--------------+
> |id |goodmin(value)|
> +---+--------------+
> |1  |a             |
> |2  |b             |
> +---+--------------+
> scala> df groupBy "id" agg callUDF("badmin", $"value") show false
> +---+-------------+
> |id |badmin(value)|
> +---+-------------+
> |1  |a            |
> |2  |a            |
> +---+-------------+
> {noformat}
> The difference between GoodMin and BadMin is a buffer initialization.
> *This example could be reproduced with a single worker thread only*. To 
> reproduce it is mandatory to run spark shell with option
> {code:java}
> spark-shell --master local[1]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to