[ 
https://issues.apache.org/jira/browse/SPARK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696717#comment-15696717
 ] 

Nicholas Chammas commented on SPARK-18589:
------------------------------------------

cc [~davies] [~hvanhovell]

> persist() resolves "java.lang.RuntimeException: Invalid PythonUDF 
> <lambda>(...), requires attributes from more than one child"
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18589
>                 URL: https://issues.apache.org/jira/browse/SPARK-18589
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 2.0.2, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> Smells like another optimizer bug that's similar to SPARK-17100 and 
> SPARK-18254. I'm seeing this on 2.0.2 and on master at commit 
> {{fb07bbe575aabe68422fd3a31865101fb7fa1722}}.
> I don't have a minimal repro for this yet, but the error I'm seeing is:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
> : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires 
> attributes from more than one child.
>     at scala.sys.package$.error(package.scala:27)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
>     at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>     at scala.collection.immutable.List.foldLeft(List.scala:84)
>     at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
>     at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
>     at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
>     at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
>     at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at py4j.Gateway.invoke(Gateway.java:280)
>     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at py4j.GatewayConnection.run(GatewayConnection.java:214)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The extended plan (cleaned of field names) is as follows:
> {code}
> == Parsed Logical Plan ==
> 'Filter NOT ('expected_prediction = 'prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, 
> cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS 
> expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, 
> UDF(rawPrediction) AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS 
> probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS 
> rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS 
> pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, 
> primary_key, person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, 
> _testing_universal_key, struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, 
> primary_key, struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, 
> primary_key, person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, 
> _testing_universal_key, struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, 
> primary_key, struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Analyzed Logical Plan ==
> p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: 
> vector, probability: vector, prediction: double, expected_prediction: float
> Filter NOT (cast(expected_prediction as double) = prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, 
> cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS 
> expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, 
> UDF(rawPrediction) AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS 
> probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS 
> rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS 
> pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, 
> primary_key, person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, 
> _testing_universal_key, struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, 
> primary_key, struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, 
> primary_key, person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, 
> _testing_universal_key, struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, 
> primary_key, struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Optimized Logical Plan ==
> Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, 
> struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, 
> struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, 
> struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, 
> struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key 
> = struct(...)._testing_universal_key) as float) AS expected_prediction]
> +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = 
> struct(...)._testing_universal_key) as float) as double) = 
> UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key 
> = _blocking_key))
>    :- Project [..., <lambda>(dataset_name, primary_key, person) AS 
> _blocking_key]
>    :  +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>    :     +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 
> replicas)
>    :        :  +- *Project [primary_key, struct(...) AS person, test_people 
> AS dataset_name]
>    :        :     +- Scan ExistingRDD[...]
>    +- Project [..., <lambda>(dataset_name, primary_key, person) AS 
> _blocking_key]
>       +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>          +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 
> replicas)
>             :  +- *Project [primary_key, struct(...) AS person, test_people 
> AS dataset_name]
>             :     +- Scan ExistingRDD[...]
> == Physical Plan ==
> java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, 
> struct(...).person), requires attributes from more than one child.
> {code}
> Note the error at the end when Spark tries to print the physical plan. I've 
> scrubbed some Project fields from the plan to simplify the display, but if 
> I've scrubbed anything you think is important let me know.
> I can get around this problem by adding a {{persist()}} right before the 
> operation that fails. The failing operation is a filter.
> Any clues on how I can boil this down to a minimal repro? Any clues about 
> where the problem is?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to