[ https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514725#comment-16514725 ]
Abdeali Kothari commented on SPARK-24458: ----------------------------------------- Weirdly adding a random column in the starting makes this work: But a copy-column doesnt make it work: data = data.withColumn(COLNAME, data[data.columns[0]]) Code: {noformat} import findspark findspark.init() import pyspark from pyspark.sql import types as T, functions as F spark_builder = pyspark.sql.SparkSession.Builder() spark = spark_builder.getOrCreate() spark.sparkContext.setLogLevel("ERROR") in_path = '/tmp/a.csv' out_path = '/tmp/out.parquet' data = spark.read.format('csv').load(in_path) # ---------------------->>>>>>>>>>>>>>>>>>>>>>>> THIS FIXES IT ??? COLNAME = 'a' data = data.withColumn(COLNAME, F.rand()) data.show() def check_1(): return "PASS" # Option 2: String output, No inputs --- FAILS testfunc = F.udf(T.StringType())(check_1) data = data.withColumn('check_out', testfunc()) data = data.filter(data['check_out'] == 'PASS') data.write.parquet(out_path, mode='overwrite') data.show(){noformat} OR Adding a dummy column and actually passing a column works: {noformat} import findspark findspark.init() import pyspark from pyspark.sql import types as T, functions as F spark_builder = pyspark.sql.SparkSession.Builder() spark = spark_builder.getOrCreate() spark.sparkContext.setLogLevel("ERROR") in_path = '/tmp/a.csv' out_path = '/tmp/out.parquet' data = spark.read.format('csv').load(in_path) data.show() def check_1(dummy_col): # ----------->>>>>>>>>>>>>> THIS FIXES IT ??? return "PASS" # Option 2: String output, No inputs --- FAILS testfunc = F.udf(T.StringType())(check_1) # ------------------------------->>>>>>>>>>>>>> AND GIVE A COLUMN AS INPUT data = data.withColumn('check_out', testfunc(data[data.columns[0]])) data = data.filter(data['check_out'] == 'PASS') data.write.parquet(out_path, mode='overwrite') data.show(){noformat} > Invalid PythonUDF check_1(), requires attributes from more than one child > ------------------------------------------------------------------------- > > Key: SPARK-24458 > URL: https://issues.apache.org/jira/browse/SPARK-24458 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.0 > Environment: Spark 2.3.0 (local mode) > Mac OSX > Reporter: Abdeali Kothari > Priority: Major > > I was trying out a very large query execution plan I have and I got the error: > > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling > o359.simpleString. > : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires > attributes from more than one child. > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181) > at scala.collection.immutable.Stream.foreach(Stream.scala:594) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187) > at > org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100) > at > org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:748){code} > I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and > I drop some columns in between. Finally, I create 3 python lists (for > example, [0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using > createDataFrame. > I join all three list-converted-dataframes using crossJoin() and then do a > crossJoin with the original data I have. Then I run a Python UDF which is > check_1. check_1 is something like: > {code:java} > def check_1(): > if 1 == 1: > return 'yes' > else: > return 'no'{code} > So, it is a Python UDF which takes in no argument and always returns 'yes'. > (Note: This UDF is created on the fly... so for testing, I am currently just > using this dummy always 'yes' function) > After I get check_1 's output, I am converting all my checks (they could be > more than 1 but in my current test I have only 1) into a Map(string, string). > Finally, I try to do a filter("checks['first'] = 'yes'") to filter the > records I need. > When I try to do the filter and then do a .explain() it fails with the above > error. > > Here is the explain of the dataframe up until before I do the filter(): > > {noformat} > *(1) Project [... cols ...] > +- BatchEvalPython [python_udf_to_create_map([check_1], > array(pythonUDF0#1851))], [... cols ...] > +- BatchEvalPython [check_1()], [... cols ...] > +- InMemoryTableScan [... cols ...] > +- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 > replicas) > +- BroadcastNestedLoopJoin BuildLeft, Cross > :- BroadcastExchange IdentityBroadcastMode > : +- *(5) Project [... cols ...] > : +- BatchEvalPython [... Python UDF ...], [... cols ...] > : +- *(4) Project [... cols ...] > : +- BatchEvalPython [... Python UDFs ...], [... cols ...] > : +- *(3) Project [... cols ...] > : +- BatchEvalPython [... Python UDFs ...], [... cols ...] > : +- *(2) Project [... cols ...] > : +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ] > : +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, > Location: InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: struct<... > +- CartesianProduct > :- *(6) Project [value#1261 AS computed_v1#1263] > : +- Scan ExistingRDD[value#1261] > +- CartesianProduct > :- *(7) Project [value#1265 AS computed_v2#1267] > : +- Scan ExistingRDD[value#1265] > +- *(8) Project [value#1269 AS computed_v3#1271] > +- Scan ExistingRDD[value#1269]{noformat} > I have simplified the explain() output. Let me know if I have deleted some > data you may need. > > > I tried creating a simpler reproducible example, but wasn't able to make > anything simpler .... -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org