kanika dhuria created SPARK-22205:
-------------------------------------

             Summary: Incorrect result with  user defined agg function followed 
by a non deterministic function 
                 Key: SPARK-22205
                 URL: https://issues.apache.org/jira/browse/SPARK-22205
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: kanika dhuria


Repro 
Create a user defined function like 
lass AnyUdaf(dtype:DataType) extends UserDefinedAggregateFunction {
  def inputSchema:StructType = StructType(StructField("v", dtype) :: Nil)

  def bufferSchema:StructType = StructType(StructField("v", dtype) :: Nil)

  def dataType: DataType = dtype

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = null }

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
    if (buffer(0) == null) buffer(0) = input(0)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    if(buffer1(0) == null) buffer1(0) = buffer2(0)
  }

  def evaluate(buffer: Row): Any = { buffer(0) }
}

Use this in an agg and follow it with non deterministic function like  
monotonically_increasing_id.
Seq(0,1).toDF("c1").select(col("c1"), lit(10)).toDF("c1", 
"c2").select(col("c1"), col("c2")).toDF("c1", "c2").groupBy(col("c1")).agg(new 
AnyUdaf()(col("c2"))).toDF("c1", "c2").select(lit(5), col("c2"), 
monotonically_increasing_id).show
+---+---+-----------------------------+
|  5| c2|monotonically_increasing_id()|
+---+---+-----------------------------+
|  5|10|                            0|
|  5|10|                            0|
+---+---+-----------------------------+






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to