[ https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gavrilescu Laurentiu updated SPARK-34644: ----------------------------------------- Description: *Applying an UDF followed by explode looks to be calling the UDF twice.* Imagine having the following scenario: 1. you have a dataframe with some string columns 2. you have an expensive function that creates a score based on some string input 3. you want to get all the distinct values from all the columns and their score - there is an executor level cache that holds the score values for strings to minimize the execution of the expensive function consider the following code to reproduce it: {code:java} case class RowWithStrings(c1: String, c2: String, c3: String) case class ValueScore(value: String, score: Double) object Bug { val columns: List[String] = List("c1", "c2", "c3") def score(input: String): Double = { // insert expensive function here input.toDouble } def main(args: Array[String]) { lazy val sparkSession: SparkSession = { val sparkSession = SparkSession.builder.master("local[4]") .getOrCreate() sparkSession } // some cache over expensive operation val cache: TrieMap[String, Double] = TrieMap[String, Double]() // get scores for all columns in the row val body = (row: Row) => { val arr = ArrayBuffer[ValueScore]() columns foreach { column => val value = row.getAs[String](column) if (!cache.contains(value)) { val computedScore = score(value) arr += ValueScore(value, computedScore) cache(value) = computedScore } } arr } val basicUdf = udf(body) val values = (1 to 5) map { idx => // repeated values RowWithStrings(idx.toString, idx.toString, idx.toString) } import sparkSession.implicits._ val df = values.toDF("c1", "c2", "c3").persist() val allCols = df.columns.map(col) df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .select(explode(col("valuesScore"))) .distinct() .show() } } {code} this shows: {code:java} +---+ |col| +---+ +---+ {code} When adding persist before explode, the result is correct: {code:java} df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .persist() .select(explode(col("valuesScore"))) .distinct() .show() {code} => {code:java} +--------+ | col| +--------+ |{2, 2.0}| |{4, 4.0}| |{3, 3.0}| |{5, 5.0}| |{1, 1.0}| +--------+ {code} This is not reproducible using 3.0.2 version. was: *Applying an UDF followed by explode looks to be calling the UDF twice.* Imagine having the following scenario: 1. you have a dataframe with some string columns 2. you have an expensive function that creates a score based on some string input 3. you want to get all the distinct values from all the columns and their score - there is an executor level cache that holds the score values for strings to minimize the execution of the expensive function consider the following code to reproduce it: {code:java} case class RowWithStrings(c1: String, c2: String, c3: String) case class ValueScore(value: String, score: Double) object Bug { val columns: List[String] = List("c1", "c2", "c3") def score(input: String): Double = { // insert expensive function here input.toDouble } def main(args: Array[String]) { lazy val sparkSession: SparkSession = { val sparkSession = SparkSession.builder.master("local[4]") .getOrCreate() sparkSession } // some cache over expensive operation val cache: TrieMap[String, Double] = TrieMap[String, Double]() // get scores for all columns in the row val body = (row: Row) => { val arr = ArrayBuffer[ValueScore]() columns foreach { column => val value = row.getAs[String](column) if (!cache.contains(value)) { val computedScore = score(value) arr += ValueScore(value, computedScore) cache(value) = computedScore } } arr } val basicUdf = udf(body) val values = (1 to 5) map { idx => // repeated values RowWithStrings(idx.toString, idx.toString, idx.toString) } import sparkSession.implicits._ val df = values.toDF("c1", "c2", "c3").persist() val allCols = df.columns.map(col) df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .select(explode(col("valuesScore"))) .distinct() .show() } } {code} this shows: {code:java} +---+ |col| +---+ +---+ {code} When adding persist before explode, the result is correct: {code:java} df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .persist() .select(explode(col("valuesScore"))) .distinct() .show() {code} => {code:java} +--------+ | col| +--------+ |{2, 2.0}| |{4, 4.0}| |{3, 3.0}| |{5, 5.0}| |{1, 1.0}| +--------+ {code} > UDF returning array followed by explode returns wrong results > ------------------------------------------------------------- > > Key: SPARK-34644 > URL: https://issues.apache.org/jira/browse/SPARK-34644 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Gavrilescu Laurentiu > Priority: Major > > *Applying an UDF followed by explode looks to be calling the UDF twice.* > Imagine having the following scenario: > 1. you have a dataframe with some string columns > 2. you have an expensive function that creates a score based on some string > input > 3. you want to get all the distinct values from all the columns and their > score - there is an executor level cache that holds the score values for > strings to minimize the execution of the expensive function > consider the following code to reproduce it: > {code:java} > case class RowWithStrings(c1: String, c2: String, c3: String) > case class ValueScore(value: String, score: Double) > object Bug { > val columns: List[String] = List("c1", "c2", "c3") > def score(input: String): Double = { > // insert expensive function here > input.toDouble > } > def main(args: Array[String]) { > lazy val sparkSession: SparkSession = { > val sparkSession = SparkSession.builder.master("local[4]") > .getOrCreate() > sparkSession > } > // some cache over expensive operation > val cache: TrieMap[String, Double] = TrieMap[String, Double]() > // get scores for all columns in the row > val body = (row: Row) => { > val arr = ArrayBuffer[ValueScore]() > columns foreach { > column => > val value = row.getAs[String](column) > if (!cache.contains(value)) { > val computedScore = score(value) > arr += ValueScore(value, computedScore) > cache(value) = computedScore > } > } > arr > } > val basicUdf = udf(body) > val values = (1 to 5) map { > idx => > // repeated values > RowWithStrings(idx.toString, idx.toString, idx.toString) > } > import sparkSession.implicits._ > val df = values.toDF("c1", "c2", "c3").persist() > val allCols = df.columns.map(col) > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .select(explode(col("valuesScore"))) > .distinct() > .show() > } > } > {code} > this shows: > {code:java} > +---+ > |col| > +---+ > +---+ > {code} > When adding persist before explode, the result is correct: > {code:java} > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .persist() > .select(explode(col("valuesScore"))) > .distinct() > .show() > {code} > => > {code:java} > +--------+ > | col| > +--------+ > |{2, 2.0}| > |{4, 4.0}| > |{3, 3.0}| > |{5, 5.0}| > |{1, 1.0}| > +--------+ > {code} > This is not reproducible using 3.0.2 version. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org