[ https://issues.apache.org/jira/browse/SPARK-22446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387191#comment-16387191 ]
Liang-Chi Hsieh commented on SPARK-22446: ----------------------------------------- Yeah, sounds good. > Optimizer causing StringIndexerModel's indexer UDF to throw "Unseen label" > exception incorrectly for filtered data. > ------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-22446 > URL: https://issues.apache.org/jira/browse/SPARK-22446 > Project: Spark > Issue Type: Bug > Components: ML, Optimizer > Affects Versions: 2.0.2, 2.1.2, 2.2.1 > Environment: spark-shell, local mode, macOS Sierra 10.12.6 > Reporter: Greg Bellchambers > Assignee: Liang-Chi Hsieh > Priority: Major > Fix For: 2.3.0 > > > In the following, the `indexer` UDF defined inside the > `org.apache.spark.ml.feature.StringIndexerModel.transform` method throws an > "Unseen label" error, despite the label not being present in the transformed > DataFrame. > Here is the definition of the indexer UDF in the transform method: > {code:java} > val indexer = udf { label: String => > if (labelToIndex.contains(label)) { > labelToIndex(label) > } else { > throw new SparkException(s"Unseen label: $label.") > } > } > {code} > We can demonstrate the error with a very simple example DataFrame. > {code:java} > scala> import org.apache.spark.ml.feature.StringIndexer > import org.apache.spark.ml.feature.StringIndexer > scala> // first we create a DataFrame with three cities > scala> val df = List( > | ("A", "London", "StrA"), > | ("B", "Bristol", null), > | ("C", "New York", "StrC") > | ).toDF("ID", "CITY", "CONTENT") > df: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 1 more > field] > scala> df.show > +---+--------+-------+ > | ID| CITY|CONTENT| > +---+--------+-------+ > | A| London| StrA| > | B| Bristol| null| > | C|New York| StrC| > +---+--------+-------+ > scala> // then we remove the row with null in CONTENT column, which removes > Bristol > scala> val dfNoBristol = finalStatic.filter($"CONTENT".isNotNull) > dfNoBristol: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: > string, CITY: string ... 1 more field] > scala> dfNoBristol.show > +---+--------+-------+ > | ID| CITY|CONTENT| > +---+--------+-------+ > | A| London| StrA| > | C|New York| StrC| > +---+--------+-------+ > scala> // now create a StringIndexer for the CITY column and fit to > dfNoBristol > scala> val model = { > | new StringIndexer() > | .setInputCol("CITY") > | .setOutputCol("CITYIndexed") > | .fit(dfNoBristol) > | } > model: org.apache.spark.ml.feature.StringIndexerModel = strIdx_f5afa23333fb > scala> // the StringIndexerModel has only two labels: "London" and "New York" > scala> str.labels foreach println > London > New York > scala> // transform our DataFrame to add an index column > scala> val dfWithIndex = model.transform(dfNoBristol) > dfWithIndex: org.apache.spark.sql.DataFrame = [ID: string, CITY: string ... 2 > more fields] > scala> dfWithIndex.show > +---+--------+-------+-----------+ > | ID| CITY|CONTENT|CITYIndexed| > +---+--------+-------+-----------+ > | A| London| StrA| 0.0| > | C|New York| StrC| 1.0| > +---+--------+-------+-----------+ > {code} > The unexpected behaviour comes when we filter `dfWithIndex` for `CITYIndexed` > equal to 1.0 and perform an action. The `indexer` UDF in `transform` method > throws an exception reporting unseen label "Bristol". This is irrational > behaviour as far as the user of the API is concerned, because there is no > such value as "Bristol" when do show all rows of `dfWithIndex`: > {code:java} > scala> dfWithIndex.filter($"CITYIndexed" === 1.0).count > 17/11/04 00:33:41 ERROR Executor: Exception in task 1.0 in stage 20.0 (TID 40) > org.apache.spark.SparkException: Failed to execute user defined > function($anonfun$5: (string) => double) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Unseen label: Bristol. To handle > unseen labels, set Param handleInvalid to keep. > at > org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:222) > at > org.apache.spark.ml.feature.StringIndexerModel$$anonfun$5.apply(StringIndexer.scala:208) > ... 13 more > {code} > To understand what is happening here, note that an action is triggered when > we call `StringIndexer.fit()`, before the `CITYIndexed === 1` filter is > applied, so the StringIndexerModel sees only London and New York, as > expected. Now compare the query plans for `dfWithIndex` and > `dfWithIndex.filter($"CITYIndexed" === 1.0)`: > {noformat} > scala> dfWithIndex.explain > == Physical Plan == > *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS > CITYIndexed#159] > +- *Filter isnotnull(_3#5) > +- LocalTableScan [_1#3, _2#4, _3#5] > scala> dfWithIndex.filter($"CITYIndexed" === 1.0).explain > == Physical Plan == > *Project [_1#3 AS ID#7, _2#4 AS CITY#8, _3#5 AS CONTENT#9, UDF(_2#4) AS > CITYIndexed#159] > +- *Filter (isnotnull(_3#5) && (UDF(_2#4) = 1.0)) > +- LocalTableScan [_1#3, _2#4, _3#5] > {noformat} > Note that in the latter, the query plan has pushed the filter `$"CITYIndexed" > === 1.0` back to be performed at the same stage as our null filter (`Filter > (isnotnull(_3#5) && (UDF(_2#4) = 1.0))`). > With a debugger I have seen that both operands of `&&` are executed on each > row of `df`: `isnotnull(_3#5)` and `UDF(_2#4) = 1.0`. Therefore, the UDF is > passed the label `Bristol` despite isnotnull returning false for that row. > If we cache the DataFrame `dfNoBristol` immediately after creating it, then > there is no longer an error because the optimizer does not attempt to call > the UDF on unseen data. The fact that we get different results depending on > whether or not we call cache is a cause for concern. > I have seen similar issues with pure SparkSql DataFrame operations when the > DAG gets complicated (many joins, and aggregations). These are harder to > isolate to such a simple example, but I plan to report them in the near > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org