kanika dhuria created SPARK-22205: ------------------------------------- Summary: Incorrect result with user defined agg function followed by a non deterministic function Key: SPARK-22205 URL: https://issues.apache.org/jira/browse/SPARK-22205 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: kanika dhuria
Repro Create a user defined function like lass AnyUdaf(dtype:DataType) extends UserDefinedAggregateFunction { def inputSchema:StructType = StructType(StructField("v", dtype) :: Nil) def bufferSchema:StructType = StructType(StructField("v", dtype) :: Nil) def dataType: DataType = dtype def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = null } def update(buffer: MutableAggregationBuffer,input: Row): Unit = { if (buffer(0) == null) buffer(0) = input(0) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { if(buffer1(0) == null) buffer1(0) = buffer2(0) } def evaluate(buffer: Row): Any = { buffer(0) } } Use this in an agg and follow it with non deterministic function like monotonically_increasing_id. Seq(0,1).toDF("c1").select(col("c1"), lit(10)).toDF("c1", "c2").select(col("c1"), col("c2")).toDF("c1", "c2").groupBy(col("c1")).agg(new AnyUdaf()(col("c2"))).toDF("c1", "c2").select(lit(5), col("c2"), monotonically_increasing_id).show +---+---+-----------------------------+ | 5| c2|monotonically_increasing_id()| +---+---+-----------------------------+ | 5|10| 0| | 5|10| 0| +---+---+-----------------------------+ -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org