[ 
https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Benedeki updated SPARK-35371:
-----------------------------------
    Description: 
When using an UDF returning string or complex type (Struct) on array members 
the resulting array consists of the last array member UDF result.
h3. *Example code:*

{code:spark}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.{callUDF, col, transform, udf}


val sparkBuilder: SparkSession.Builder = SparkSession.builder()
  .master("local[*]")
  .appName(s"Udf Bug Demo")
  .config("spark.ui.enabled", "false")
  .config("spark.debug.maxToStringFields", 100)

val spark: SparkSession = sparkBuilder
  .config("spark.driver.bindAddress", "127.0.0.1")
  .config("spark.driver.host", "127.0.0.1")
  .getOrCreate()

import spark.implicits._

case class Foo(num: Int, s: String)

val src  = Seq(
  (1, 2, Array(1, 2, 3)),
  (2, 2, Array(2, 2, 2)),
  (3, 4, Array(3, 4, 3, 4))
).toDF("A", "B", "C")

val udfStringName = "UdfString"
val udfIntName = "UdfInt"
val udfStructName = "UdfStruct"

val udfString = udf((num: Int) => {
  (num + 1).toString
})
spark.udf.register(udfStringName, udfString)

val udfInt = udf((num: Int) => {
  num + 1
})
spark.udf.register(udfIntName, udfInt)

val udfStruct = udf((num: Int) => {
  Foo(num + 1, (num + 1).toString)
})
spark.udf.register(udfStructName, udfStruct)


val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)

val cA = callUDF(udfStringName, col("A"))
val cB = callUDF(udfStringName, col("B"))
val cCString: Column = transform(col("C"), lambdaString)
val cCInt: Column = transform(col("C"), lambdaInt)
val cCStruc: Column = transform(col("C"), lambdaStruct)
val dest = src.withColumn("AStr", cA)
  .withColumn("BStr", cB)
  .withColumn("CString (Wrong)", cCString)
  .withColumn("CInt (OK)", cCInt)
  .withColumn("CStruct (Wrong)", cCStruc)

dest.show(false)
dest.printSchema()
{code}

h3. *Expected:*
{noformat}
+---+---+------------+----+----+---------------+------------+--------------------------------+
|A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct            
          |
+---+---+------------+----+----+---------------+------------+--------------------------------+
|1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, 
{4, 4}]        |
|2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
{3, 3}]        |
|3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
{4, 4}, {5, 5}]|
+---+---+------------+----+----+---------------+------------+--------------------------------+
{noformat}
h3. *Got:*
{noformat}
+---+---+------------+----+----+---------------+------------+--------------------------------+
|A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)    
             |
+---+---+------------+----+----+---------------+------------+--------------------------------+
|1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, 
{4, 4}]        |
|2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
{3, 3}]        |
|3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
{5, 5}, {5, 5}]|
+---+---+------------+----+----+---------------+------------+--------------------------------+
{noformat}
h3. *Observation*
 * Work correctly on Spark 3.0.2
 * When UDF is registered as Java UDF, it works as supposed
 * The UDF is called the appropriate number of times (regardless if UDF is 
marked as deterministic or non-deterministic).
 * When debugged, the correct value is actually saved into the result array at 
first but every subsequent item processing overwrites the previous result 
values as well. Therefore the last item values filling the array is the final 
result.
 * When the UDF returns NULL/None it does not "overwrite” the prior array 
values nor is “overwritten” by subsequent non-NULL values. See with following 
UDF impelementation:

{color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => {
   {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) {
      {color:#000000}None{color}  } {color:#0033b3}else {color}{
      Some((num + {color:#1750eb}1{color}).toString)
   }
 })


  was:
When using an UDF returning string or complex type (Struct) on array members 
the resulting array consists of the last array member UDF result.
h3. *Example code:*

{color:#0033b3}import {color}org.apache.spark.sql.\{Column, SparkSession}
 import org.apache.spark.sql.functions.\{callUDF, col, transform, udf}
 
 val {color:#000000}sparkBuilder{color}: 
{color:#000000}SparkSession{color}.{color:#000000}Builder {color}= 
{color:#000000}SparkSession{color}.builder()
   .master({color:#067d17}"local[*]"{color})
   .appName({color:#067d17}s"Udf Bug Demo"{color})
   .config({color:#067d17}"spark.ui.enabled"{color}, 
{color:#067d17}"false"{color})
   .config({color:#067d17}"spark.debug.maxToStringFields"{color}, 
{color:#1750eb}100{color})

{color:#0033b3}val {color}{color:#000000}spark{color}: 
{color:#000000}SparkSession {color}= {color:#000000}sparkBuilder{color} 
.config({color:#067d17}"spark.driver.bindAddress"{color}, 
{color:#067d17}"127.0.0.1"{color})
   .config({color:#067d17}"spark.driver.host"{color}, 
{color:#067d17}"127.0.0.1"{color})
   .getOrCreate()

{color:#0033b3}import 
{color}{color:#000000}spark{color}.{color:#000000}implicits{color}._

{color:#0033b3}case class {color}{color:#000000}Foo{color}(num: Int, s: 
{color:#007e8a}String{color})

{color:#0033b3}val {color}{color:#000000}src {color}= {color:#871094}Seq{color}(
   ({color:#1750eb}1{color}, {color:#1750eb}2{color}, 
Array({color:#1750eb}1{color}, {color:#1750eb}2{color}, 
{color:#1750eb}3{color})),
   ({color:#1750eb}2{color}, {color:#1750eb}2{color}, 
Array({color:#1750eb}2{color}, {color:#1750eb}2{color}, 
{color:#1750eb}2{color})),
   ({color:#1750eb}3{color}, {color:#1750eb}4{color}, 
Array({color:#1750eb}3{color}, {color:#1750eb}4{color}, 
{color:#1750eb}3{color}, {color:#1750eb}4{color}))
 ).toDF({color:#067d17}"A"{color}, {color:#067d17}"B"{color}, 
{color:#067d17}"C"{color})

{color:#0033b3}val {color}{color:#000000}udfStringName {color}= 
{color:#067d17}"UdfString"{color}{color:#0033b3}val 
{color}{color:#000000}udfIntName {color}= 
{color:#067d17}"UdfInt"{color}{color:#0033b3}val 
{color}{color:#000000}udfStructName {color}= 
{color:#067d17}"UdfStruct"{color}{color:#0033b3}val 
{color}{color:#000000}udfString {color}= udf((num: Int) => {
   (num + {color:#1750eb}1{color}).toString
 })
 {color:#000000}spark{color}.udf.register({color:#000000}udfStringName{color}, 
{color:#000000}udfString{color})

{color:#0033b3}val {color}{color:#000000}udfInt {color}= udf((num: Int) => {
   num + {color:#1750eb}1
 })
 spark{color}.udf.register({color:#000000}udfIntName{color}, 
{color:#000000}udfInt{color})

{color:#0033b3}val {color}{color:#000000}udfStruct {color}= udf((num: Int) => {
   Foo(num + {color:#1750eb}1{color}, (num + {color:#1750eb}1{color}).toString)
 })
 {color:#000000}spark{color}.udf.register({color:#000000}udfStructName{color}, 
{color:#000000}udfStruct{color})

{color:#0033b3}val {color}{color:#000000}lambdaString {color}= (forCol: 
{color:#000000}Column{color}) => callUDF({color:#000000}udfStringName{color}, 
forCol)
 {color:#0033b3}val {color}{color:#000000}lambdaInt {color}= (forCol: 
{color:#000000}Column{color}) => callUDF({color:#000000}udfIntName{color}, 
forCol)
 {color:#0033b3}val {color}{color:#000000}lambdaStruct {color}= (forCol: 
{color:#000000}Column{color}) => callUDF({color:#000000}udfStructName{color}, 
forCol)

{color:#0033b3}val {color}{color:#000000}cA {color}= 
callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"A"{color}))
 {color:#0033b3}val {color}{color:#000000}cB {color}= 
callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"B"{color}))
 {color:#0033b3}val {color}{color:#000000}cCString{color}: 
{color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), 
{color:#000000}lambdaString{color})
 {color:#0033b3}val {color}{color:#000000}cCInt{color}: {color:#000000}Column 
{color}= transform(col({color:#067d17}"C"{color}), 
{color:#000000}lambdaInt{color})
 {color:#0033b3}val {color}{color:#000000}cCStruc{color}: {color:#000000}Column 
{color}= transform(col({color:#067d17}"C"{color}), 
{color:#000000}lambdaStruct{color})
 {color:#0033b3}val {color}{color:#000000}dest {color}= 
{color:#000000}src{color}.withColumn({color:#067d17}"AStr"{color}, 
{color:#000000}cA{color})
   .withColumn({color:#067d17}"BStr"{color}, {color:#000000}cB{color})
   .withColumn({color:#067d17}"CString (Wrong)"{color}, 
{color:#000000}cCString{color})
   .withColumn({color:#067d17}"CInt (OK)"{color}, {color:#000000}cCInt{color})
   .withColumn({color:#067d17}"CStruct (Wrong)"{color}, 
{color:#000000}cCStruc{color})

{color:#000000}dest{color}.show({color:#0033b3}false{color})
 {color:#000000}dest{color}.printSchema()
h3. *Expected:*
{noformat}
+---+---+------------+----+----+---------------+------------+--------------------------------+
|A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct            
          |
+---+---+------------+----+----+---------------+------------+--------------------------------+
|1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, 
{4, 4}]        |
|2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
{3, 3}]        |
|3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
{4, 4}, {5, 5}]|
+---+---+------------+----+----+---------------+------------+--------------------------------+
{noformat}
h3. *Got:*
{noformat}
+---+---+------------+----+----+---------------+------------+--------------------------------+
|A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)    
             |
+---+---+------------+----+----+---------------+------------+--------------------------------+
|1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, 
{4, 4}]        |
|2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
{3, 3}]        |
|3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
{5, 5}, {5, 5}]|
+---+---+------------+----+----+---------------+------------+--------------------------------+
{noformat}
h3. *Observation*
 * Work correctly on Spark 3.0.2
 * When UDF is registered as Java UDF, it works as supposed
 * The UDF is called the appropriate number of times (regardless if UDF is 
marked as deterministic or non-deterministic).
 * When debugged, the correct value is actually saved into the result array at 
first but every subsequent item processing overwrites the previous result 
values as well. Therefore the last item values filling the array is the final 
result.
 * When the UDF returns NULL/None it does not "overwrite” the prior array 
values nor is “overwritten” by subsequent non-NULL values. See with following 
UDF impelementation:

{color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => {
   {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) {
      {color:#000000}None{color}  } {color:#0033b3}else {color}{
      Some((num + {color:#1750eb}1{color}).toString)
   }
 })

 


> Scala UDF returning string or complex type applied to array members returns 
> wrong data
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-35371
>                 URL: https://issues.apache.org/jira/browse/SPARK-35371
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: David Benedeki
>            Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members 
> the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:spark}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct          
>             |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, 
> {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
> {4, 4}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)  
>                |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, 
> {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
> {5, 5}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is 
> marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array 
> at first but every subsequent item processing overwrites the previous result 
> values as well. Therefore the last item values filling the array is the final 
> result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array 
> values nor is “overwritten” by subsequent non-NULL values. See with following 
> UDF impelementation:
> {color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => 
> {
>    {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) {
>       {color:#000000}None{color}  } {color:#0033b3}else {color}{
>       Some((num + {color:#1750eb}1{color}).toString)
>    }
>  })



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to