[
https://issues.apache.org/jira/browse/SPARK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761741#comment-15761741
]
Franklyn Dsouza edited comment on SPARK-18589 at 12/19/16 5:21 PM:
-------------------------------------------------------------------
The sequence of steps that causes this are:
{code}
join two dataframes A and B > make a udf that uses one column from A and
another from B > filter on column produced by udf > java.lang.RuntimeException:
Invalid PythonUDF <lambda>(b#1L, c#6L), requires attributes from more than one
child.
{code}
Here are some minimum steps to reproduce this issue in pyspark
{code:python}
from pyspark.sql import types
from pyspark.sql import functions as F
df1 = sqlCtx.createDataFrame([types.Row(a=1, b=2), types.Row(a=1, b=4)])
df2 = sqlCtx.createDataFrame([types.Row(a=1, c=12)])
joined = df1.join(df2, df1['a'] == df2['a'])
extra = joined.withColumn('sum', F.udf(lambda a,b : a+b,
types.IntegerType())(joined['b'], joined['c']))
filtered = extra.where(extra['sum'] < F.lit(10)).collect()
{code}
was (Author: franklyndsouza):
The sequence of steps that causes this are:
{code}
join two dataframes A and B > make a udf that uses one column from A and
another from B > filter on column produced by udf >
`java.lang.RuntimeException: Invalid PythonUDF <lambda>(b#1L, c#6L), requires
attributes from more than one child.`
{code}
Here are some minimum steps to reproduce this issue in pyspark
{code:python}
from pyspark.sql import types
from pyspark.sql import functions as F
df1 = sqlCtx.createDataFrame([types.Row(a=1, b=2), types.Row(a=1, b=4)])
df2 = sqlCtx.createDataFrame([types.Row(a=1, c=12)])
joined = df1.join(df2, df1['a'] == df2['a'])
extra = joined.withColumn('sum', F.udf(lambda a,b : a+b,
types.IntegerType())(joined['b'], joined['c']))
filtered = extra.where(extra['sum'] < F.lit(10)).collect()
{code}
> persist() resolves "java.lang.RuntimeException: Invalid PythonUDF
> <lambda>(...), requires attributes from more than one child"
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18589
> URL: https://issues.apache.org/jira/browse/SPARK-18589
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 2.0.2, 2.1.0
> Environment: Python 3.5, Java 8
> Reporter: Nicholas Chammas
> Priority: Minor
>
> Smells like another optimizer bug that's similar to SPARK-17100 and
> SPARK-18254. I'm seeing this on 2.0.2 and on master at commit
> {{fb07bbe575aabe68422fd3a31865101fb7fa1722}}.
> I don't have a minimal repro for this yet, but the error I'm seeing is:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
> : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires
> attributes from more than one child.
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
> at scala.collection.immutable.Stream.foreach(Stream.scala:594)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
> at
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
> at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The extended plan (cleaned of field names) is as follows:
> {code}
> == Parsed Logical Plan ==
> 'Filter NOT ('expected_prediction = 'prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction,
> cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS
> expected_prediction]
> +- Project [p1, p2, pair_features, rawPrediction, probability,
> UDF(rawPrediction) AS prediction]
> +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS
> probability]
> +- Project [p1, p2, pair_features, UDF(pair_features) AS
> rawPrediction]
> +- Project [p1, p2, <lambda>(p1.person, p2.person) AS
> pair_features]
> +- Project [struct(...) AS p1, struct(...) AS p2]
> +- Project [_blocking_key, ..., ...]
> +- Join Inner, (_blocking_key = _blocking_key)
> :- SubqueryAlias p1
> : +- Project [..., <lambda>(dataset_name,
> primary_key, person) AS _blocking_key]
> : +- Project [...]
> : +- Project [primary_key, universal_key,
> _testing_universal_key, struct(...) AS person]
> : +- Project [...]
> : +- Project [_testing_universal_key,
> primary_key, struct(...) AS person]
> : +- LogicalRDD [...]
> +- SubqueryAlias p2
> +- Project [..., <lambda>(dataset_name,
> primary_key, person) AS _blocking_key]
> +- Project [...]
> +- Project [primary_key, universal_key,
> _testing_universal_key, struct(...) AS person]
> +- Project [...]
> +- Project [_testing_universal_key,
> primary_key, struct(...) AS person]
> +- LogicalRDD [...]
> == Analyzed Logical Plan ==
> p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction:
> vector, probability: vector, prediction: double, expected_prediction: float
> Filter NOT (cast(expected_prediction as double) = prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction,
> cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS
> expected_prediction]
> +- Project [p1, p2, pair_features, rawPrediction, probability,
> UDF(rawPrediction) AS prediction]
> +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS
> probability]
> +- Project [p1, p2, pair_features, UDF(pair_features) AS
> rawPrediction]
> +- Project [p1, p2, <lambda>(p1.person, p2.person) AS
> pair_features]
> +- Project [struct(...) AS p1, struct(...) AS p2]
> +- Project [_blocking_key, ..., ...]
> +- Join Inner, (_blocking_key = _blocking_key)
> :- SubqueryAlias p1
> : +- Project [..., <lambda>(dataset_name,
> primary_key, person) AS _blocking_key]
> : +- Project [...]
> : +- Project [primary_key, universal_key,
> _testing_universal_key, struct(...) AS person]
> : +- Project [...]
> : +- Project [_testing_universal_key,
> primary_key, struct(...) AS person]
> : +- LogicalRDD [...]
> +- SubqueryAlias p2
> +- Project [..., <lambda>(dataset_name,
> primary_key, person) AS _blocking_key]
> +- Project [...]
> +- Project [primary_key, universal_key,
> _testing_universal_key, struct(...) AS person]
> +- Project [...]
> +- Project [_testing_universal_key,
> primary_key, struct(...) AS person]
> +- LogicalRDD [...]
> == Optimized Logical Plan ==
> Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person,
> struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person,
> struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person,
> struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person,
> struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key
> = struct(...)._testing_universal_key) as float) AS expected_prediction]
> +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key =
> struct(...)._testing_universal_key) as float) as double) =
> UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key
> = _blocking_key))
> :- Project [..., <lambda>(dataset_name, primary_key, person) AS
> _blocking_key]
> : +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
> : +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1
> replicas)
> : : +- *Project [primary_key, struct(...) AS person, test_people
> AS dataset_name]
> : : +- Scan ExistingRDD[...]
> +- Project [..., <lambda>(dataset_name, primary_key, person) AS
> _blocking_key]
> +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
> +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1
> replicas)
> : +- *Project [primary_key, struct(...) AS person, test_people
> AS dataset_name]
> : +- Scan ExistingRDD[...]
> == Physical Plan ==
> java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person,
> struct(...).person), requires attributes from more than one child.
> {code}
> Note the error at the end when Spark tries to print the physical plan. I've
> scrubbed some Project fields from the plan to simplify the display, but if
> I've scrubbed anything you think is important let me know.
> I can get around this problem by adding a {{persist()}} right before the
> operation that fails. The failing operation is a filter.
> Any clues on how I can boil this down to a minimal repro? Any clues about
> where the problem is?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]