[
https://issues.apache.org/jira/browse/SPARK-34033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263841#comment-17263841
]
Apache Spark commented on SPARK-34033:
--------------------------------------
User 'WamBamBoozle' has created a pull request for this issue:
https://github.com/apache/spark/pull/31162
> SparkR Daemon Initialization
> ----------------------------
>
> Key: SPARK-34033
> URL: https://issues.apache.org/jira/browse/SPARK-34033
> Project: Spark
> Issue Type: Improvement
> Components: R, SparkR
> Affects Versions: 3.2.0
> Environment: tested on centos 7 & spark 2.3.1 and on my mac & spark
> at master
> Reporter: Tom Howland
> Priority: Major
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> Provide a way for users to initialize the sparkR daemon before it forks.
> I'm a contractor to Target, where we have several projects doing ML with
> sparkR. The changes proposed here results in weeks of compute-time saved with
> every run.
> (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply
> in our app) / 60 / 60 = 111 hours.
> (from
> [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
> h3. Daemon Initialization
> If your worker function has a lengthy initialization, and your
> application has lots of partitions, you may find you are spending weeks
> of compute time repeatedly doing something that should have taken a few
> seconds during daemon initialization.
> Every Spark executor spawns a process running an R daemon. The daemon
> "forks a copy" of itself whenever Spark finds work for it to do. It may
> be applying a predefined method such as "max", or it may be applying
> your worker function. SparkR::gapply arranges things so that your worker
> function will be called with each group. A group is the pair
> Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
> every group found. With partitioning, the daemon will fork for every
> partition found. A partition may have several groups in it.
> All the initializations and library loading your worker function manages
> is thrown away when the fork concludes. Every fork has to be
> initialized.
> The configuration spark.r.daemonInit provides a way to avoid reloading
> packages every time the daemon forks by having the daemon pre-load
> packages. You do this by providing R code to initialize the daemon for
> your application.
> h4. Examples
> Suppose we want library(wow) to be pre-loaded for our workers.
> {{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
> of course, that would only work if we knew that library(wow) was on our
> path and available on the executor. If we have to ship the library, we
> can use YARN
> sparkR.session(
> master = 'yarn',
> spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths()));
> library(wow)',
> spark.submit.deployMode = 'client',
> spark.yarn.dist.archives = 'wow.zip#wowTarget')
> YARN creates a directory for the new executor, unzips 'wow.zip' in some
> other directory, and then provides a symlink to it called
> ./wowTarget. When the executor starts the daemon, the daemon loads
> library(wow) from the newly created wowTarget.
> Warning: if your initialization takes longer than 10 seconds, consider
> increasing the configuration
> [spark.r.daemonTimeout](configuration.md#sparkr).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]