[ https://issues.apache.org/jira/browse/SPARK-28246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923850#comment-16923850 ]
Pavel Parkhomenko commented on SPARK-28246: ------------------------------------------- It is about initialization, not merge. > State of UDAF: buffer is not cleared > ------------------------------------ > > Key: SPARK-28246 > URL: https://issues.apache.org/jira/browse/SPARK-28246 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3 > Environment: Ubuntu Linux 16.04 > Reproducible with option --master local[1] > {code:java} > $ spark-shell --master local[1] > {code} > Reporter: Pavel Parkhomenko > Priority: Major > > Buffer object for UserDefinedAggregateFunction contains data from previous > iteration. For example, > {code:java} > import org.apache.spark.sql.Row > import org.apache.spark.sql.expressions.{MutableAggregationBuffer, > UserDefinedAggregateFunction} > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions.callUDF > import java.util.Arrays.asList > val df = spark.createDataFrame( > asList( > Row(1, "a"), > Row(2, "b")), > StructType(List( > StructField("id", IntegerType), > StructField("value", StringType)))) > trait Min extends UserDefinedAggregateFunction { > override val inputSchema: StructType = > StructType(Array(StructField("value", StringType))) > override val bufferSchema: StructType = StructType(Array(StructField("min", > StringType))) > override def dataType: DataType = StringType > override def deterministic: Boolean = true > override def update(buffer: MutableAggregationBuffer, input: Row): Unit = > if (input(0) != null && (buffer(0) == null || buffer.getString(0) > > input.getString(0))) buffer(0) = input(0) > override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = > update(buffer1, buffer2) > override def evaluate(buffer: Row): Any = buffer(0) > } > class GoodMin extends Min { > override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) > = None > } > class BadMin extends Min { > override def initialize(buffer: MutableAggregationBuffer): Unit = {} > } > spark.udf.register("goodmin", new GoodMin) > spark.udf.register("badmin", new BadMin) > df groupBy "id" agg callUDF("goodmin", $"value") show false > df groupBy "id" agg callUDF("badmin", $"value") show false > {code} > Output is > {noformat} > scala> df groupBy "id" agg callUDF("goodmin", $"value") show false > +---+--------------+ > |id |goodmin(value)| > +---+--------------+ > |1 |a | > |2 |b | > +---+--------------+ > scala> df groupBy "id" agg callUDF("badmin", $"value") show false > +---+-------------+ > |id |badmin(value)| > +---+-------------+ > |1 |a | > |2 |a | > +---+-------------+ > {noformat} > The difference between GoodMin and BadMin is a buffer initialization. > *This example could be reproduced with a single worker thread only*. To > reproduce it is mandatory to run spark shell with option > {code:java} > spark-shell --master local[1] > {code} > -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org