[
https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gavrilescu Laurentiu updated SPARK-34644:
-----------------------------------------
Description:
*Applying an UDF followed by explode looks to be calling the UDF twice.*
Imagine having the following scenario:
1. you have a dataframe with some string columns
2. you have an expensive function that creates a score based on some string
input
3. you want to get all the distinct values from all the columns and their
score - there is an executor level cache that holds the score values for
strings to minimize the execution of the expensive function
consider the following code to reproduce it:
{code:java}
case class RowWithStrings(c1: String, c2: String, c3: String)
case class ValueScore(value: String, score: Double)
object Bug {
val columns: List[String] = List("c1", "c2", "c3")
def score(input: String): Double = {
// insert expensive function here
input.toDouble
}
def main(args: Array[String]) {
lazy val sparkSession: SparkSession = {
val sparkSession = SparkSession.builder.master("local[4]")
.getOrCreate()
sparkSession
}
// some cache over expensive operation
val cache: TrieMap[String, Double] = TrieMap[String, Double]()
// get scores for all columns in the row
val body = (row: Row) => {
val arr = ArrayBuffer[ValueScore]()
columns foreach {
column =>
val value = row.getAs[String](column)
if (!cache.contains(value)) {
val computedScore = score(value)
arr += ValueScore(value, computedScore)
cache(value) = computedScore
}
}
arr
}
val basicUdf = udf(body)
val values = (1 to 5) map {
idx =>
// repeated values
RowWithStrings(idx.toString, idx.toString, idx.toString)
}
import sparkSession.implicits._
val df = values.toDF("c1", "c2", "c3").persist()
val allCols = df.columns.map(col)
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
.select(explode(col("valuesScore")))
.distinct()
.show()
}
}
{code}
this shows:
{code:java}
+---+
|col|
+---+
+---+
{code}
When adding persist before explode, the result is correct:
{code:java}
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
.persist()
.select(explode(col("valuesScore")))
.distinct()
.show()
{code}
=>
{code:java}
+--------+
| col|
+--------+
|{2, 2.0}|
|{4, 4.0}|
|{3, 3.0}|
|{5, 5.0}|
|{1, 1.0}|
+--------+
{code}
This is not reproducible using 3.0.2 version.
LE:
The UDF is indeed evaluated multiple times if not using persist:
{code:java}
object Bug {
def main(args: Array[String]) {
val sparkSession: SparkSession = {
val sparkSession = SparkSession.builder.master("local[4]")
.getOrCreate()
sparkSession
}
val invocations = sparkSession.sparkContext.longAccumulator("invocations")
def showTiming[T](body: => T): T = {
val t0 = System.nanoTime()
invocations.reset()
val res = body
val t1 = System.nanoTime()
println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
res
}
def expensive(n: Int) = {
Thread.sleep(100)
invocations.add(1)
1
}
val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })
val df = sparkSession.range(10).toDF()
showTiming(df.select(expensiveUdf(col("id")).as("values"))
.select(explode(col("values")).as("value"))
.select(sum("value"))
.show())
showTiming(df.select(expensiveUdf(col("id")).as("values"))
.persist()
.select(explode(col("values")).as("value"))
.select(sum("value"))
.show())
}
}
{code}
=>
{code:java}
first: invocations=300, time=11.342076635
second: invocations=100, time=3.351682967
{code}
was:
*Applying an UDF followed by explode looks to be calling the UDF twice.*
Imagine having the following scenario:
1. you have a dataframe with some string columns
2. you have an expensive function that creates a score based on some string
input
3. you want to get all the distinct values from all the columns and their
score - there is an executor level cache that holds the score values for
strings to minimize the execution of the expensive function
consider the following code to reproduce it:
{code:java}
case class RowWithStrings(c1: String, c2: String, c3: String)
case class ValueScore(value: String, score: Double)
object Bug {
val columns: List[String] = List("c1", "c2", "c3")
def score(input: String): Double = {
// insert expensive function here
input.toDouble
}
def main(args: Array[String]) {
lazy val sparkSession: SparkSession = {
val sparkSession = SparkSession.builder.master("local[4]")
.getOrCreate()
sparkSession
}
// some cache over expensive operation
val cache: TrieMap[String, Double] = TrieMap[String, Double]()
// get scores for all columns in the row
val body = (row: Row) => {
val arr = ArrayBuffer[ValueScore]()
columns foreach {
column =>
val value = row.getAs[String](column)
if (!cache.contains(value)) {
val computedScore = score(value)
arr += ValueScore(value, computedScore)
cache(value) = computedScore
}
}
arr
}
val basicUdf = udf(body)
val values = (1 to 5) map {
idx =>
// repeated values
RowWithStrings(idx.toString, idx.toString, idx.toString)
}
import sparkSession.implicits._
val df = values.toDF("c1", "c2", "c3").persist()
val allCols = df.columns.map(col)
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
.select(explode(col("valuesScore")))
.distinct()
.show()
}
}
{code}
this shows:
{code:java}
+---+
|col|
+---+
+---+
{code}
When adding persist before explode, the result is correct:
{code:java}
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
.persist()
.select(explode(col("valuesScore")))
.distinct()
.show()
{code}
=>
{code:java}
+--------+
| col|
+--------+
|{2, 2.0}|
|{4, 4.0}|
|{3, 3.0}|
|{5, 5.0}|
|{1, 1.0}|
+--------+
{code}
This is not reproducible using 3.0.2 version.
> UDF returning array followed by explode returns wrong results
> -------------------------------------------------------------
>
> Key: SPARK-34644
> URL: https://issues.apache.org/jira/browse/SPARK-34644
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.1
> Reporter: Gavrilescu Laurentiu
> Priority: Major
>
> *Applying an UDF followed by explode looks to be calling the UDF twice.*
> Imagine having the following scenario:
> 1. you have a dataframe with some string columns
> 2. you have an expensive function that creates a score based on some string
> input
> 3. you want to get all the distinct values from all the columns and their
> score - there is an executor level cache that holds the score values for
> strings to minimize the execution of the expensive function
> consider the following code to reproduce it:
> {code:java}
> case class RowWithStrings(c1: String, c2: String, c3: String)
> case class ValueScore(value: String, score: Double)
> object Bug {
> val columns: List[String] = List("c1", "c2", "c3")
> def score(input: String): Double = {
> // insert expensive function here
> input.toDouble
> }
> def main(args: Array[String]) {
> lazy val sparkSession: SparkSession = {
> val sparkSession = SparkSession.builder.master("local[4]")
> .getOrCreate()
> sparkSession
> }
> // some cache over expensive operation
> val cache: TrieMap[String, Double] = TrieMap[String, Double]()
> // get scores for all columns in the row
> val body = (row: Row) => {
> val arr = ArrayBuffer[ValueScore]()
> columns foreach {
> column =>
> val value = row.getAs[String](column)
> if (!cache.contains(value)) {
> val computedScore = score(value)
> arr += ValueScore(value, computedScore)
> cache(value) = computedScore
> }
> }
> arr
> }
> val basicUdf = udf(body)
> val values = (1 to 5) map {
> idx =>
> // repeated values
> RowWithStrings(idx.toString, idx.toString, idx.toString)
> }
> import sparkSession.implicits._
> val df = values.toDF("c1", "c2", "c3").persist()
> val allCols = df.columns.map(col)
> df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
> .select(explode(col("valuesScore")))
> .distinct()
> .show()
> }
> }
> {code}
> this shows:
> {code:java}
> +---+
> |col|
> +---+
> +---+
> {code}
> When adding persist before explode, the result is correct:
> {code:java}
> df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
> .persist()
> .select(explode(col("valuesScore")))
> .distinct()
> .show()
> {code}
> =>
> {code:java}
> +--------+
> | col|
> +--------+
> |{2, 2.0}|
> |{4, 4.0}|
> |{3, 3.0}|
> |{5, 5.0}|
> |{1, 1.0}|
> +--------+
> {code}
> This is not reproducible using 3.0.2 version.
> LE:
> The UDF is indeed evaluated multiple times if not using persist:
> {code:java}
> object Bug {
> def main(args: Array[String]) {
> val sparkSession: SparkSession = {
> val sparkSession = SparkSession.builder.master("local[4]")
> .getOrCreate()
> sparkSession
> }
> val invocations = sparkSession.sparkContext.longAccumulator("invocations")
> def showTiming[T](body: => T): T = {
> val t0 = System.nanoTime()
> invocations.reset()
> val res = body
> val t1 = System.nanoTime()
> println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
> res
> }
> def expensive(n: Int) = {
> Thread.sleep(100)
> invocations.add(1)
> 1
> }
> val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })
> val df = sparkSession.range(10).toDF()
> showTiming(df.select(expensiveUdf(col("id")).as("values"))
> .select(explode(col("values")).as("value"))
> .select(sum("value"))
> .show())
> showTiming(df.select(expensiveUdf(col("id")).as("values"))
> .persist()
> .select(explode(col("values")).as("value"))
> .select(sum("value"))
> .show())
>
> }
> }
> {code}
> =>
> {code:java}
> first: invocations=300, time=11.342076635
> second: invocations=100, time=3.351682967
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]