[ 
https://issues.apache.org/jira/browse/SPARK-24208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540612#comment-16540612
 ] 

Apache Spark commented on SPARK-24208:
--------------------------------------

User 'mgaido91' has created a pull request for this issue:
https://github.com/apache/spark/pull/21751

> Cannot resolve column in self join after applying Pandas UDF
> ------------------------------------------------------------
>
>                 Key: SPARK-24208
>                 URL: https://issues.apache.org/jira/browse/SPARK-24208
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: AWS EMR 5.13.0
> Amazon Hadoop distribution 2.8.3
> Spark 2.3.0
> Pandas 0.22.0
>            Reporter: Rafal Ganczarek
>            Priority: Minor
>
> I noticed that after applying Pandas UDF function, a self join of resulted 
> DataFrame will fail to resolve columns. The workaround that I found is to 
> recreate DataFrame with its RDD and schema.
> Below you can find a Python code that reproduces the issue.
> {code:java}
> from pyspark import Row
> import pyspark.sql.functions as F
> @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
> def dummy_pandas_udf(df):
>     return df[['key','col']]
> df = spark.createDataFrame([Row(key=1,col='A'), Row(key=1,col='B'), 
> Row(key=2,col='C')])
> # transformation that causes the issue
> df = df.groupBy('key').apply(dummy_pandas_udf)
> # WORKAROUND that fixes the issue
> # df = spark.createDataFrame(df.rdd, df.schema)
> df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == 
> F.col('temp1.key')).show()
> {code}
> If workaround line is commented out, then above code fails with the following 
> error:
> {code:java}
> AnalysisExceptionTraceback (most recent call last)
> <ipython-input-88-8de763656d6d> in <module>()
>      12 # df = spark.createDataFrame(df.rdd, df.schema)
>      13 
> ---> 14 df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == 
> F.col('temp1.key')).show()
> /usr/lib/spark/python/pyspark/sql/dataframe.py in join(self, other, on, how)
>     929                 on = self._jseq([])
>     930             assert isinstance(how, basestring), "how should be 
> basestring"
> --> 931             jdf = self._jdf.join(other._jdf, on, how)
>     932         return DataFrame(jdf, self.sql_ctx)
>     933 
> /usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, 
> *args)
>    1158         answer = self.gateway_client.send_command(command)
>    1159         return_value = get_return_value(
> -> 1160             answer, self.gateway_client, self.target_id, self.name)
>    1161 
>    1162         for temp_arg in temp_args:
> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>      67                                              
> e.java_exception.getStackTrace()))
>      68             if s.startswith('org.apache.spark.sql.AnalysisException: 
> '):
> ---> 69                 raise AnalysisException(s.split(': ', 1)[1], 
> stackTrace)
>      70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
>      71                 raise AnalysisException(s.split(': ', 1)[1], 
> stackTrace)
> AnalysisException: u"cannot resolve '`temp0.key`' given input columns: 
> [temp0.key, temp0.col];;\n'Join Inner, ('temp0.key = 'temp1.key)\n:- 
> AnalysisBarrier\n:     +- SubqueryAlias temp0\n:        +- 
> FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), 
> [key#4104L, col#4105]\n:           +- Project [key#4099L, col#4098, 
> key#4099L]\n:              +- LogicalRDD [col#4098, key#4099L], false\n+- 
> AnalysisBarrier\n      +- SubqueryAlias temp1\n         +- 
> FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), 
> [key#4104L, col#4105]\n            +- Project [key#4099L, col#4098, 
> key#4099L]\n               +- LogicalRDD [col#4098, key#4099L], false\n"
> {code}
> The same happens, if instead of DataFrame API I use Spark SQL to do a self 
> join:
> {code:java}
> # df is a DataFrame after applying dummy_pandas_udf
> df.createOrReplaceTempView('df')
> spark.sql('''
>     SELECT 
>         *
>     FROM df temp0
>     LEFT JOIN df temp1 ON
>         temp0.key == temp1.key
> ''').show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to