[
https://issues.apache.org/jira/browse/SPARK-42910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704575#comment-17704575
]
Maciej Szymkiewicz edited comment on SPARK-42910 at 3/24/23 11:15 AM:
----------------------------------------------------------------------
It is no longer generic, so that cannot be a problem.
Additionally, the issue seems to disappear when classes are defined externally:
{code:python}
# foo,py
from abc import ABC
from typing import Generic, TypeVar, Callable
T = TypeVar("T")
class Foo:
...
class A(ABC, Generic[T]):
base_record: Callable[..., T]
class B(A):
base_record = Foo
class C(B):
...
def f(_: int) -> int:
print(C.base_record)
return 1
{code}
and then
{code:python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("schema_test")\
.getOrCreate()
spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add)
{code}
so it makes sense to focus further investigation on the way how we prepare
locally defined classes for shipping over the wire.
was (Author: zero323):
It is no longer generic, so that cannot be a problem.
Additionally, the issue seems to disappear when classes are defined externally:
{code:python}
# foo,py
from abc import ABC
from typing import Generic, TypeVar, Callable
T = TypeVar("T")
class Foo:
...
class A(ABC, Generic[T]):
base_record: Callable[..., T]
class B(A):
base_record = Foo
class C(B):
...
def f(_: int) -> int:
print(C.base_record)
return 1
{code}
and then
{code: python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("schema_test")\
.getOrCreate()
spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add)
{code}
so it makes sense to focus further investigation on the way how we prepare
locally defined classes for shipping over the wire.
> Generic annotation of class attribute in abstract class is NOT initalized in
> inherited classes
> ----------------------------------------------------------------------------------------------
>
> Key: SPARK-42910
> URL: https://issues.apache.org/jira/browse/SPARK-42910
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.3.0, 3.3.2
> Environment: Tested in two environments:
> # Databricks
> Pyspark Version: 3.3.0
> Python Version: 3.9.15
> # Local
> Pyspark Verison: 3.3.2
> Python Version: 3.3.10
> Reporter: Jon Farzanfar
> Priority: Minor
>
> We are trying to leverage generics to better type our code base. The example
> below shows the problem we are having, however without generics this works
> completely fine in pyspark however with generics it doesn't but does locally
> without leveraging pyspark.
> Output for local:
>
> {code:java}
> <class '__main__.Foo'>{code}
>
> TraceBack for pyspark:
> {code:java}
> AttributeError: type object 'C' has no attribute 'base_record'
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
> at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
> at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
> at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
> at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
> at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
> at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
> at
> org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
> at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
> at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
> at
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
> at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
> at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
> at
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
> at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
> at
> org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:136)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more {code}
>
> Code:
>
> {code:java}
> from abc import ABC
> from typing import Generic, TypeVar, Callable
> from operator import add
> from pyspark.sql import SparkSession
> T = TypeVar("T")
> class Foo:
> ...
> class A(ABC, Generic[T]):
> base_record: Callable[..., T]
> class B(A):
> base_record = Foo
> class C(B):
> ...
> def f(_: int) -> int:
> print(C.base_record)
> return 1
> spark = SparkSession\
> .builder\
> .appName("schema_test")\
> .getOrCreate()
> spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]