[ 
https://issues.apache.org/jira/browse/SPARK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-13773:
------------------------------
    Component/s: SQL

> UDF being applied to filtered data 
> -----------------------------------
>
>                 Key: SPARK-13773
>                 URL: https://issues.apache.org/jira/browse/SPARK-13773
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: James Hammerton
>
> Give the following code:
> {code:title=ReproduceSparkBug.scala|borderStyle=solid}
> import scala.reflect.runtime.universe
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.sql.types.DataTypes
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.SparkConf
> object ReproduceSparkBug {
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setMaster("local")
>       .setAppName("ReproduceSparkBug")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     val schema = StructType(Array(
>       StructField("userId", DataTypes.StringType),
>       StructField("objectId", DataTypes.StringType),
>       StructField("eventName", DataTypes.StringType),
>       StructField("eventJson", DataTypes.StringType),
>       StructField("timestamp", DataTypes.LongType)))
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val df = sqlContext.read.format("com.databricks.spark.csv")
>       .option("delimiter", "\t")
>       .option("header", "false")
>       .schema(schema).load("src/test/resources/foo.txt")
>     val filtered = df.filter((df("eventName")).endsWith("Created"))
>     val extracted = filtered.select(filtered(EventFieldNames.ObjectId), 
>       extractorUDF(filtered("eventJson"), filtered("objectId"), 
> filtered("userId")) as "info")
>     extracted.filter((extracted("info")).notEqual("NO REFS")).collect.map(r 
> => (r.getString(0), r.getString(1))).foreach(println(_))
>     sc.stop()
>   }
>   def extractorUDF = udf(extractor(_: String, _: String, _: String))
>   def extractor(eventJson: String, objectId: String, userId: String): String 
> = {
>     println(eventJson + ":" + objectId + ":" + userId)
>     eventJson + ":" + objectId + ":" + userId
>   }
> }
> {code}
> If "src/test/resources" contains the following:
> {noformat}
> 113   0c38c6c327224e43a46f663b6424612f        Created {"field":"value"}       
> 1000
> 113   0c38c6c327224e43a46f663b6424612f        LabelRemoved    {"this":"should 
> not appear"}    1000
> {noformat}
> Then the code outputs the following to std out:
> {noformat}
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"this":"should not appear"}:0c38c6c327224e43a46f663b6424612f:113
> (0c38c6c327224e43a46f663b6424612f,{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113)
> {noformat}
> If the first filter is cached (i.e. we write 'val filtered = 
> df.filter((df("eventName")).endsWith("Created")).cache'), then only the first 
> and last lines appear.
> What I think is happening is that the UDF is applied to the unfiltered data 
> but then the filtering is applied so the correct data is output. Also it 
> seems the UDF gets applied more than once to the data that isn't filtered for 
> some reason.
> This caused problems in my original code where some json parsing was done in 
> the UDF but was throwing exceptions because it was applied to data that 
> should have been filtered out. The original code was reading from parquet but 
> I switch to tab separated format here to make things easier to see/post.
> I suspect the bug hasn't been found hitherto since the correct results do get 
> produced in the end, and the UDF would need to cause task failures when 
> applied to the filtered data for people to notice.
> Note that I could not reproduce this unless the data was read in from a file. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to