Gavrilescu Laurentiu created SPARK-34644: --------------------------------------------
Summary: UDF returning array followed by explode returns wrong results Key: SPARK-34644 URL: https://issues.apache.org/jira/browse/SPARK-34644 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Gavrilescu Laurentiu *Applying an UDF followed by explode looks to be calling the UDF twice.* Imagine having the following scenario: 1. you have a dataframe with some string columns 2. you have an expensive function that creates a score based on some string input 3. you want to get all the distinct values from all the columns and their score - there is an executor level cache that holds the score values for strings to minimize the execution of the expensive function consider the following code to reproduce it: {code:java} case class RowWithStrings(c1: String, c2: String, c3: String) case class ValueScore(value: String, score: Double) object Bug { val columns: List[String] = List("c1", "c2", "c3") def score(input: String): Double = { // insert expensive function here input.toDouble } def main(args: Array[String]) { lazy val sparkSession: SparkSession = { val sparkSession = SparkSession.builder.master("local[4]") .getOrCreate() sparkSession } // some cache over expensive operation val cache: TrieMap[String, Double] = TrieMap[String, Double]() // get scores for all columns in the row val body = (row: Row) => { val arr = ArrayBuffer[ValueScore]() columns foreach { column => val value = row.getAs[String](column) if (!cache.contains(value)) { val computedScore = score(value) arr += ValueScore(value, computedScore) cache(value) = computedScore } } arr } val basicUdf = udf(body) val values = (1 to 5) map { idx => // repeated values RowWithStrings(idx.toString, idx.toString, idx.toString) } import sparkSession.implicits._ val df = values.toDF("c1", "c2", "c3").persist() val allCols = df.columns.map(col) df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .select(explode(col("valuesScore"))) .distinct() .show() } } {code} this shows: {code:java} +---+ |col| +---+ +---+ {code} When adding persist before explode, the result is correct: {code:java} df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .persist() .select(explode(col("valuesScore"))) .distinct() .show() {code} => {code:java} +--------+ | col| +--------+ |{2, 2.0}| |{4, 4.0}| |{3, 3.0}| |{5, 5.0}| |{1, 1.0}| +--------+ {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org