[ 
https://issues.apache.org/jira/browse/SPARK-42910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704403#comment-17704403
 ] 

Hyukjin Kwon commented on SPARK-42910:
--------------------------------------

cc [~zero323] FYI

> Generic annotation of class attribute in abstract class is NOT initalized in 
> inherited classes
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42910
>                 URL: https://issues.apache.org/jira/browse/SPARK-42910
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0, 3.3.2
>         Environment: Tested in two environments:
>  # Databricks
> Pyspark Version: 3.3.0
> Python Version: 3.9.15
>  # Local
> Pyspark Verison: 3.3.2
> Python Version: 3.3.10
>            Reporter: Jon Farzanfar
>            Priority: Minor
>
> We are trying to leverage generics to better type our code base.  The example 
> below shows the problem we are having, however without generics this works 
> completely fine in pyspark however with generics it doesn't but does locally 
> without leveraging pyspark.  
> Output for local: 
>  
> {code:java}
> <class '__main__.Foo'>{code}
>  
> TraceBack for pyspark: 
> {code:java}
> AttributeError: type object 'C' has no attribute 'base_record'
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
>       at 
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator.foreach(Iterator.scala:943)
>       at scala.collection.Iterator.foreach$(Iterator.scala:943)
>       at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>       at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
>       at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
>       at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
>       at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
>       at 
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
>       at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
>       at 
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
>       at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
>       at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
>       at 
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
>       at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
>       at 
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:136)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more {code}
>  
> Code:
>  
> {code:java}
> from abc import ABC
> from typing import Generic, TypeVar, Callable
> from operator import add
> from pyspark.sql import SparkSession
> T = TypeVar("T")
> class Foo:
>     ...
> class A(ABC, Generic[T]):
>     base_record: Callable[..., T]
> class B(A):
>     base_record = Foo
> class C(B):
>     ...
> def f(_: int) -> int:
>     print(C.base_record)
>     return 1
> spark = SparkSession\
>     .builder\
>     .appName("schema_test")\
>     .getOrCreate()
> spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to