Gavrilescu Laurentiu created SPARK-34644:
--------------------------------------------

             Summary: UDF returning array followed by explode returns wrong 
results
                 Key: SPARK-34644
                 URL: https://issues.apache.org/jira/browse/SPARK-34644
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.1
            Reporter: Gavrilescu Laurentiu


*Applying an UDF followed by explode looks to be calling the UDF twice.*

Imagine having the following scenario:

1. you have a dataframe with some string columns
 2. you have an expensive function that creates a score based on some string 
input
 3. you want to get all the distinct values from all the columns and their 
score - there is an executor level cache that holds the score values for 
strings to minimize the execution of the expensive function

consider the following code to reproduce it:
{code:java}
case class RowWithStrings(c1: String, c2: String, c3: String)
case class ValueScore(value: String, score: Double)

object Bug {

  val columns: List[String] = List("c1", "c2", "c3")

  def score(input: String): Double = {
    // insert expensive function here
    input.toDouble
  }

  def main(args: Array[String]) {
    lazy val sparkSession: SparkSession = {
      val sparkSession = SparkSession.builder.master("local[4]")
        .getOrCreate()

      sparkSession
    }

    // some cache over expensive operation
    val cache: TrieMap[String, Double] = TrieMap[String, Double]()

    // get scores for all columns in the row
    val body = (row: Row) => {
      val arr = ArrayBuffer[ValueScore]()
      columns foreach {
        column =>
          val value = row.getAs[String](column)
          if (!cache.contains(value)) {
            val computedScore = score(value)
            arr += ValueScore(value, computedScore)
            cache(value) = computedScore
          }
      }
      arr
    }

    val basicUdf = udf(body)

    val values = (1 to 5) map {
      idx =>
        // repeated values
        RowWithStrings(idx.toString, idx.toString, idx.toString)
    }

    import sparkSession.implicits._
    val df = values.toDF("c1", "c2", "c3").persist()
    val allCols = df.columns.map(col)

    df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
      .select(explode(col("valuesScore")))
      .distinct()
      .show()
  }
}
{code}
 this shows:
{code:java}
+---+
|col|
+---+
+---+
{code}
When adding persist before explode, the result is correct:
{code:java}
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
  .persist()
  .select(explode(col("valuesScore")))
  .distinct()
  .show()
{code}
=>
{code:java}
+--------+
|     col|
+--------+
|{2, 2.0}|
|{4, 4.0}|
|{3, 3.0}|
|{5, 5.0}|
|{1, 1.0}|
+--------+
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to