[
https://issues.apache.org/jira/browse/SPARK-20347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970933#comment-15970933
]
Maciej Szymkiewicz edited comment on SPARK-20347 at 4/17/17 10:16 AM:
----------------------------------------------------------------------
This is a nice idea but I wonder what would be the benefit over using
{{concurrent.futures}}? These work just fine with PySpark (at least in simple
case) and the only overhead is creating the executor. It is not like we have
worry about GIL here.
For my own internal usage I went with monkey patch like this (plus some
`SparkContext.stop` patching):
{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor
def _get_executor():
sc = SparkContext.getOrCreate()
if not hasattr(sc, "_thread_pool_executor"):
max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
return sc._thread_pool_executor
def asyncCount(self):
return _get_executor().submit(self.count)
def foreachAsync(self, f):
return _get_executor().submit(self.foreach, f)
RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync
sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores",
3))
f = rdd.asyncCount(
{code}
was (Author: zero323):
This is a nice idea but I wonder what would be the benefit over using
{{concurrent.futures}}? These work just fine with PySpark (at least in simple
case) and the only overhead is creating the executor. It is not like we have
worry about GIL here.
{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor
def _get_executor():
sc = SparkContext.getOrCreate()
if not hasattr(sc, "_thread_pool_executor"):
max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
return sc._thread_pool_executor
def asyncCount(self):
return _get_executor().submit(self.count)
def foreachAsync(self, f):
return _get_executor().submit(self.foreach, f)
RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync
{code}
> Provide AsyncRDDActions in Python
> ---------------------------------
>
> Key: SPARK-20347
> URL: https://issues.apache.org/jira/browse/SPARK-20347
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Affects Versions: 2.2.0
> Reporter: holdenk
> Priority: Minor
>
> In core Spark AsyncRDDActions allows people to perform non-blocking RDD
> actions. In Python where threading & is a bit more involved there could be
> value in exposing this, the easiest way might involve using the Py4J callback
> server on the driver.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]