[ 
https://issues.apache.org/jira/browse/SPARK-29017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-29017:
---------------------------------
    Description: 
Pyspark has {{setJobGroup}} and {{setLocalProperty}} methods, which are 
intended to set properties which only effect the calling thread.  They try to 
do this my calling the equivalent JVM functions via Py4J.

However, there is nothing ensuring that subsequent py4j calls from a python 
thread call into the same thread in java.  In effect, this means this methods 
might appear to work some of the time, if you happen to get lucky and get the 
same thread on the java side.  But then sometimes it won't work, and in fact 
its less likely to work if there are multiple threads in python submitting jobs.

I think the right way to fix this is to keep a *python* thread-local tracking 
these properties, and then sending them through to the JVM on calls to 
submitJob.  This is going to be a headache to get right, though; we've also got 
to handle implicit calls, eg. {{rdd.collect()}}, {{rdd.forEach()}}, etc.  And 
of course users may have defined their own functions, which will be broken 
until they fix it to use the same thread-locals.

An alternative might be to use what py4j calls the "Single Threading Model" 
(https://www.py4j.org/advanced_topics.html#the-single-threading-model).  I'd 
want to look more closely at the py4j implementation of how that works first.


I can't think of any guaranteed workaround, but I think you could increase your 
chances of getting the desired behavior if you always set those properties just 
before each call to runJob.  Eg., instead of


{code:python}
# more likely to trigger bug this way
sc.setJobGroup("a")

rdd1.collect()  # or whatever other ways you submit a job
rdd2.collect()
# lots more stuff ...
rddN.collect()
{code}

change it to

{code:python}
# slightly safer, but still no guarantees

sc.setJobGroup("a")
rdd1.collect()  # or whatever other ways you submit a job
sc.setJobGroup("a")
rdd2.collect()
# lots more stuff ...
sc.setJobGroup("a")
rddN.collect()
{code}

  was:
Pyspark has {{setJobGroup}} and {{setLocalProperty}} methods, which are 
intended to set properties which only effect the calling thread.  They try to 
do this my calling the equivalent JVM functions via Py4J.

However, there is nothing ensuring that subsequent py4j calls from a python 
thread call into the same thread in java.  In effect, this means this methods 
might appear to work some of the time, if you happen to get lucky and get the 
same thread on the java side.  But then sometimes it won't work, and in fact 
its less likely to work if there are multiple threads in python submitting jobs.

I think the right way to fix this is to keep a *python* thread-local tracking 
these properties, and then sending them through to the JVM on calls to 
submitJob.  This is going to be a headache to get right, though; we've also got 
to handle implicit calls, eg. {{rdd.collect()}}, {{rdd.forEach()}}, etc.  And 
of course users may have defined their own functions, which will be broken 
until they fix it to use the same thread-locals.

An alternative might be to use what py4j calls the "Single Threading Model" 
(https://www.py4j.org/advanced_topics.html#the-single-threading-model).  I'd 
want to look more closely at the py4j implementation of how that works first.


> JobGroup and LocalProperty not respected by PySpark
> ---------------------------------------------------
>
>                 Key: SPARK-29017
>                 URL: https://issues.apache.org/jira/browse/SPARK-29017
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.4
>            Reporter: Imran Rashid
>            Priority: Major
>
> Pyspark has {{setJobGroup}} and {{setLocalProperty}} methods, which are 
> intended to set properties which only effect the calling thread.  They try to 
> do this my calling the equivalent JVM functions via Py4J.
> However, there is nothing ensuring that subsequent py4j calls from a python 
> thread call into the same thread in java.  In effect, this means this methods 
> might appear to work some of the time, if you happen to get lucky and get the 
> same thread on the java side.  But then sometimes it won't work, and in fact 
> its less likely to work if there are multiple threads in python submitting 
> jobs.
> I think the right way to fix this is to keep a *python* thread-local tracking 
> these properties, and then sending them through to the JVM on calls to 
> submitJob.  This is going to be a headache to get right, though; we've also 
> got to handle implicit calls, eg. {{rdd.collect()}}, {{rdd.forEach()}}, etc.  
> And of course users may have defined their own functions, which will be 
> broken until they fix it to use the same thread-locals.
> An alternative might be to use what py4j calls the "Single Threading Model" 
> (https://www.py4j.org/advanced_topics.html#the-single-threading-model).  I'd 
> want to look more closely at the py4j implementation of how that works first.
> I can't think of any guaranteed workaround, but I think you could increase 
> your chances of getting the desired behavior if you always set those 
> properties just before each call to runJob.  Eg., instead of
> {code:python}
> # more likely to trigger bug this way
> sc.setJobGroup("a")
> rdd1.collect()  # or whatever other ways you submit a job
> rdd2.collect()
> # lots more stuff ...
> rddN.collect()
> {code}
> change it to
> {code:python}
> # slightly safer, but still no guarantees
> sc.setJobGroup("a")
> rdd1.collect()  # or whatever other ways you submit a job
> sc.setJobGroup("a")
> rdd2.collect()
> # lots more stuff ...
> sc.setJobGroup("a")
> rddN.collect()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to