[
https://issues.apache.org/jira/browse/SPARK-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen updated SPARK-13773:
------------------------------
Component/s: SQL
> UDF being applied to filtered data
> -----------------------------------
>
> Key: SPARK-13773
> URL: https://issues.apache.org/jira/browse/SPARK-13773
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: James Hammerton
>
> Give the following code:
> {code:title=ReproduceSparkBug.scala|borderStyle=solid}
> import scala.reflect.runtime.universe
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.sql.types.DataTypes
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.apache.spark.SparkConf
> object ReproduceSparkBug {
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setMaster("local")
> .setAppName("ReproduceSparkBug")
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> val schema = StructType(Array(
> StructField("userId", DataTypes.StringType),
> StructField("objectId", DataTypes.StringType),
> StructField("eventName", DataTypes.StringType),
> StructField("eventJson", DataTypes.StringType),
> StructField("timestamp", DataTypes.LongType)))
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val df = sqlContext.read.format("com.databricks.spark.csv")
> .option("delimiter", "\t")
> .option("header", "false")
> .schema(schema).load("src/test/resources/foo.txt")
> val filtered = df.filter((df("eventName")).endsWith("Created"))
> val extracted = filtered.select(filtered(EventFieldNames.ObjectId),
> extractorUDF(filtered("eventJson"), filtered("objectId"),
> filtered("userId")) as "info")
> extracted.filter((extracted("info")).notEqual("NO REFS")).collect.map(r
> => (r.getString(0), r.getString(1))).foreach(println(_))
> sc.stop()
> }
> def extractorUDF = udf(extractor(_: String, _: String, _: String))
> def extractor(eventJson: String, objectId: String, userId: String): String
> = {
> println(eventJson + ":" + objectId + ":" + userId)
> eventJson + ":" + objectId + ":" + userId
> }
> }
> {code}
> If "src/test/resources" contains the following:
> {noformat}
> 113 0c38c6c327224e43a46f663b6424612f Created {"field":"value"}
> 1000
> 113 0c38c6c327224e43a46f663b6424612f LabelRemoved {"this":"should
> not appear"} 1000
> {noformat}
> Then the code outputs the following to std out:
> {noformat}
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"field":"value"}:0c38c6c327224e43a46f663b6424612f:113
> {"this":"should not appear"}:0c38c6c327224e43a46f663b6424612f:113
> (0c38c6c327224e43a46f663b6424612f,{"field":"value"}:0c38c6c327224e43a46f663b6424612f:113)
> {noformat}
> If the first filter is cached (i.e. we write 'val filtered =
> df.filter((df("eventName")).endsWith("Created")).cache'), then only the first
> and last lines appear.
> What I think is happening is that the UDF is applied to the unfiltered data
> but then the filtering is applied so the correct data is output. Also it
> seems the UDF gets applied more than once to the data that isn't filtered for
> some reason.
> This caused problems in my original code where some json parsing was done in
> the UDF but was throwing exceptions because it was applied to data that
> should have been filtered out. The original code was reading from parquet but
> I switch to tab separated format here to make things easier to see/post.
> I suspect the bug hasn't been found hitherto since the correct results do get
> produced in the end, and the UDF would need to cause task failures when
> applied to the filtered data for people to notice.
> Note that I could not reproduce this unless the data was read in from a file.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]