[
https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281075#comment-15281075
]
Linbo commented on SPARK-15282:
-------------------------------
There is a related JIRA issue:
https://issues.apache.org/jira/browse/SPARK-13773 , not sure the problem is
same.
> UDF functioin executed twice when filter on new column created by withColumn
> ----------------------------------------------------------------------------
>
> Key: SPARK-15282
> URL: https://issues.apache.org/jira/browse/SPARK-15282
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.1
> Environment: spark 1.6.1
> Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on [~tedyu] in current
> master branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i
> filter the values on this new columns and call show(action). I see the udf
> function (which is used to by withColumn to create the new column) is called
> twice(duplicated). And if filter on "old" column, udf only run once which is
> expected. I attached the example codes, `filteredOnNewColumnDF.show` shows
> the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1",
> "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction =
> UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
> string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> | a| b| a|
> | a1| b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1:
> string, new: string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1:
> string, new: string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> | a| b| a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> | a| b| a|
> +---+----+---+
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]