Jon Farzanfar created SPARK-42910:
-------------------------------------

             Summary: Generic annotation of class attribute in abstract class 
is NOT initalized in inherited classes
                 Key: SPARK-42910
                 URL: https://issues.apache.org/jira/browse/SPARK-42910
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.3.2, 3.3.0
         Environment: Tested in two environments:
 # Databricks
Pyspark Version: 3.3.0
Python Version: 3.9.15
 # Local
Pyspark Verison: 3.3.2
Python Version: 3.3.10
            Reporter: Jon Farzanfar


We are trying to leverage generics to better type our code base.  The example 
below shows the problem we are having, however without generics this works 
completely fine in pyspark however with generics it doesn't but does locally 
without leveraging pyspark.  

Output for local: 

 
{code:java}
<class '__main__.Foo'>{code}
 

TraceBack for pyspark: 
{code:java}
AttributeError: type object 'C' has no attribute 'base_record'

        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
        at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at 
org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at 
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at 
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more {code}
 

Code:

 
{code:java}
from abc import ABC
from typing import Generic, TypeVar, Callable
from operator import add

from pyspark.sql import SparkSession

T = TypeVar("T")

class Foo:
    ...

class A(ABC, Generic[T]):
    base_record: Callable[..., T]

class B(A):
    base_record = Foo

class C(B):
    ...

def f(_: int) -> int:
    print(C.base_record)
    return 1

spark = SparkSession\
    .builder\
    .appName("schema_test")\
    .getOrCreate()

spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to