[
https://issues.apache.org/jira/browse/SPARK-20347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970933#comment-15970933
]
Maciej Szymkiewicz edited comment on SPARK-20347 at 4/17/17 10:22 AM:
----------------------------------------------------------------------
This is a nice idea but I wonder what would be the benefit over using
{{concurrent.futures}}? These work just fine with PySpark (at least in simple
case) and the only overhead is creating the executor. It is not like we have
worry about GIL here.
For my own internal usage I went with monkey patch like this (plus some
`SparkContext.stop` patching):
{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor
def _get_executor():
sc = SparkContext.getOrCreate()
if not hasattr(sc, "_thread_pool_executor"):
max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
return sc._thread_pool_executor
def asyncCount(self):
return _get_executor().submit(self.count)
def foreachAsync(self, f):
return _get_executor().submit(self.foreach, f)
RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync
sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores",
3))
f = rdd.asyncCount()
{code}
One possible caveat is lack of direct legacy Python support but there is a 3rd
party backport and I will argue this beats implementing this from scratch.
Furthermore we get a solution which integrates with existing libraries and
should require a minimal maintenance.
was (Author: zero323):
This is a nice idea but I wonder what would be the benefit over using
{{concurrent.futures}}? These work just fine with PySpark (at least in simple
case) and the only overhead is creating the executor. It is not like we have
worry about GIL here.
For my own internal usage I went with monkey patch like this (plus some
`SparkContext.stop` patching):
{code}
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
from concurrent.futures import ThreadPoolExecutor
def _get_executor():
sc = SparkContext.getOrCreate()
if not hasattr(sc, "_thread_pool_executor"):
max_workers = int(sc.getConf().get("spark.driver.cores") or 1)
sc._thread_pool_executor = ThreadPoolExecutor(max_workers=max_workers)
return sc._thread_pool_executor
def asyncCount(self):
return _get_executor().submit(self.count)
def foreachAsync(self, f):
return _get_executor().submit(self.foreach, f)
RDD.asyncCount = asyncCount
RDD.foreachAsync = foreachAsync
sc = SparkContext(master="local[*]", conf=SparkConf().set("spark.driver.cores",
3))
f = rdd.asyncCount()
{code}
> Provide AsyncRDDActions in Python
> ---------------------------------
>
> Key: SPARK-20347
> URL: https://issues.apache.org/jira/browse/SPARK-20347
> Project: Spark
> Issue Type: Improvement
> Components: PySpark
> Affects Versions: 2.2.0
> Reporter: holdenk
> Priority: Minor
>
> In core Spark AsyncRDDActions allows people to perform non-blocking RDD
> actions. In Python where threading & is a bit more involved there could be
> value in exposing this, the easiest way might involve using the Py4J callback
> server on the driver.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]