[ 
https://issues.apache.org/jira/browse/SPARK-34033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263841#comment-17263841
 ] 

Apache Spark commented on SPARK-34033:
--------------------------------------

User 'WamBamBoozle' has created a pull request for this issue:
https://github.com/apache/spark/pull/31162

> SparkR Daemon Initialization
> ----------------------------
>
>                 Key: SPARK-34033
>                 URL: https://issues.apache.org/jira/browse/SPARK-34033
>             Project: Spark
>          Issue Type: Improvement
>          Components: R, SparkR
>    Affects Versions: 3.2.0
>         Environment: tested on centos 7 & spark 2.3.1 and on my mac & spark 
> at master
>            Reporter: Tom Howland
>            Priority: Major
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> Provide a way for users to initialize the sparkR daemon before it forks.
> I'm a contractor to Target, where we have several projects doing ML with 
> sparkR. The changes proposed here results in weeks of compute-time saved with 
> every run.
> (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply 
> in our app) / 60 / 60 = 111 hours.
> (from 
> [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
> h3. Daemon Initialization
> If your worker function has a lengthy initialization, and your
>  application has lots of partitions, you may find you are spending weeks
>  of compute time repeatedly doing something that should have taken a few
>  seconds during daemon initialization.
> Every Spark executor spawns a process running an R daemon. The daemon
>  "forks a copy" of itself whenever Spark finds work for it to do. It may
>  be applying a predefined method such as "max", or it may be applying
>  your worker function. SparkR::gapply arranges things so that your worker
>  function will be called with each group. A group is the pair
>  Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
>  every group found. With partitioning, the daemon will fork for every
>  partition found. A partition may have several groups in it.
> All the initializations and library loading your worker function manages
>  is thrown away when the fork concludes. Every fork has to be
>  initialized.
> The configuration spark.r.daemonInit provides a way to avoid reloading
>  packages every time the daemon forks by having the daemon pre-load
>  packages. You do this by providing R code to initialize the daemon for
>  your application.
> h4. Examples
> Suppose we want library(wow) to be pre-loaded for our workers.
> {{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
> of course, that would only work if we knew that library(wow) was on our
>  path and available on the executor. If we have to ship the library, we
>  can use YARN
>     sparkR.session(
>        master = 'yarn',
>        spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); 
> library(wow)',
>        spark.submit.deployMode = 'client',
>        spark.yarn.dist.archives = 'wow.zip#wowTarget')
> YARN creates a directory for the new executor, unzips 'wow.zip' in some
>  other directory, and then provides a symlink to it called
>  ./wowTarget. When the executor starts the daemon, the daemon loads
>  library(wow) from the newly created wowTarget.
> Warning: if your initialization takes longer than 10 seconds, consider
>  increasing the configuration 
> [spark.r.daemonTimeout](configuration.md#sparkr).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to