[ 
https://issues.apache.org/jira/browse/SPARK-20347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970933#comment-15970933
 ] 

Maciej Szymkiewicz edited comment on SPARK-20347 at 4/17/17 10:17 AM:
----------------------------------------------------------------------

This is a nice idea but I wonder what would be the benefit over using 
{{concurrent.futures}}? These work just fine with PySpark (at least in simple 
case) and the only overhead is creating the executor. It is not like we have 
worry about GIL here.

For my  own internal usage I went with  monkey patch like this (plus some 
`SparkContext.stop` patching):

{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor


def _get_executor():
    sc = SparkContext.getOrCreate()
    if not hasattr(sc, "_thread_pool_executor"):
        max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
        sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
    return sc._thread_pool_executor

def asyncCount(self):
    return _get_executor().submit(self.count)

def foreachAsync(self, f):
    return _get_executor().submit(self.foreach, f)

RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync

sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores", 
3))
f = rdd.asyncCount()
{code}



was (Author: zero323):
This is a nice idea but I wonder what would be the benefit over using 
{{concurrent.futures}}? These work just fine with PySpark (at least in simple 
case) and the only overhead is creating the executor. It is not like we have 
worry about GIL here.

For my  own internal usage I went with  monkey patch like this (plus some 
`SparkContext.stop` patching):

{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor


def _get_executor():
    sc = SparkContext.getOrCreate()
    if not hasattr(sc, "_thread_pool_executor"):
        max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
        sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
    return sc._thread_pool_executor

def asyncCount(self):
    return _get_executor().submit(self.count)

def foreachAsync(self, f):
    return _get_executor().submit(self.foreach, f)

RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync

sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores", 
3))
f = rdd.asyncCount(
{code}


> Provide AsyncRDDActions in Python
> ---------------------------------
>
>                 Key: SPARK-20347
>                 URL: https://issues.apache.org/jira/browse/SPARK-20347
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 2.2.0
>            Reporter: holdenk
>            Priority: Minor
>
> In core Spark AsyncRDDActions allows people to perform non-blocking RDD 
> actions. In Python where threading & is a bit more involved there could be 
> value in exposing this, the easiest way might involve using the Py4J callback 
> server on the driver.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to