[ 
https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514725#comment-16514725
 ] 

Abdeali Kothari commented on SPARK-24458:
-----------------------------------------

Weirdly adding a random column in the starting makes this work:

But a copy-column doesnt make it work:

data = data.withColumn(COLNAME, data[data.columns[0]])

Code:

 
{noformat}
import findspark
findspark.init()
import pyspark

from pyspark.sql import types as T, functions as F

spark_builder = pyspark.sql.SparkSession.Builder()
spark = spark_builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


in_path = '/tmp/a.csv'
out_path = '/tmp/out.parquet'

data = spark.read.format('csv').load(in_path)

# ---------------------->>>>>>>>>>>>>>>>>>>>>>>> THIS FIXES IT ???
COLNAME = 'a'
data = data.withColumn(COLNAME, F.rand())

data.show()

def check_1():
    return "PASS"

# Option 2: String output, No inputs --- FAILS
testfunc = F.udf(T.StringType())(check_1)
data = data.withColumn('check_out', testfunc())

data = data.filter(data['check_out'] == 'PASS')

data.write.parquet(out_path, mode='overwrite')
data.show(){noformat}
 

 

OR Adding a dummy column and actually passing a column works:

 
{noformat}
import findspark
findspark.init()
import pyspark

from pyspark.sql import types as T, functions as F

spark_builder = pyspark.sql.SparkSession.Builder()
spark = spark_builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


in_path = '/tmp/a.csv'
out_path = '/tmp/out.parquet'

data = spark.read.format('csv').load(in_path)

data.show()

def check_1(dummy_col):  # ----------->>>>>>>>>>>>>> THIS FIXES IT ???
    return "PASS"

# Option 2: String output, No inputs --- FAILS
testfunc = F.udf(T.StringType())(check_1)
# ------------------------------->>>>>>>>>>>>>> AND GIVE A COLUMN AS INPUT
data = data.withColumn('check_out', testfunc(data[data.columns[0]]))

data = data.filter(data['check_out'] == 'PASS')

data.write.parquet(out_path, mode='overwrite')
data.show(){noformat}
 

> Invalid PythonUDF check_1(), requires attributes from more than one child
> -------------------------------------------------------------------------
>
>                 Key: SPARK-24458
>                 URL: https://issues.apache.org/jira/browse/SPARK-24458
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: Spark 2.3.0 (local mode)
> Mac OSX
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires 
> attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at 
> org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at 
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at 
> org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at 
> org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
>  at 
> org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at py4j.Gateway.invoke(Gateway.java:282)
>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at py4j.GatewayConnection.run(GatewayConnection.java:214)
>  at java.lang.Thread.run(Thread.java:748){code}
> I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and 
> I drop some columns in between. Finally, I create 3 python lists (for 
> example, [0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using 
> createDataFrame.
> I join all three list-converted-dataframes using crossJoin() and then do a 
> crossJoin with the original data I have. Then I run a Python UDF which is 
> check_1. check_1 is something like:
> {code:java}
> def check_1():
>     if 1 == 1:
>         return 'yes'
>     else:
>         return 'no'{code}
>  So, it is a Python UDF which takes in no argument and always returns 'yes'. 
> (Note: This UDF is created on the fly... so for testing, I am currently just 
> using this dummy always 'yes' function) 
> After I get check_1 's output, I am converting all my checks (they could be 
> more than 1 but in my current test I have only 1) into a Map(string, string).
> Finally, I try to do a filter("checks['first'] = 'yes'") to filter the 
> records I need.
> When I try to do the filter and then do a .explain() it fails with the above 
> error.
>  
> Here is the explain of the dataframe up until before I do the filter():
>  
> {noformat}
> *(1) Project [... cols ...]
> +- BatchEvalPython [python_udf_to_create_map([check_1], 
> array(pythonUDF0#1851))], [... cols ...]
> +- BatchEvalPython [check_1()], [... cols ...]
> +- InMemoryTableScan [... cols ...]
> +- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 
> replicas)
> +- BroadcastNestedLoopJoin BuildLeft, Cross
> :- BroadcastExchange IdentityBroadcastMode
> : +- *(5) Project [... cols ...]
> : +- BatchEvalPython [... Python UDF ...], [... cols ...]
> : +- *(4) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(3) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(2) Project [... cols ...]
> : +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ]
> : +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, 
> Location: InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
> +- CartesianProduct
> :- *(6) Project [value#1261 AS computed_v1#1263]
> : +- Scan ExistingRDD[value#1261]
> +- CartesianProduct
> :- *(7) Project [value#1265 AS computed_v2#1267]
> : +- Scan ExistingRDD[value#1265]
> +- *(8) Project [value#1269 AS computed_v3#1271]
> +- Scan ExistingRDD[value#1269]{noformat}
> I have simplified the explain() output. Let me know if I have deleted some 
> data you may need.
>  
>  
> I tried creating a simpler reproducible example, but wasn't able to make 
> anything simpler ....



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to