[
https://issues.apache.org/jira/browse/SPARK-34033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tom Howland updated SPARK-34033:
--------------------------------
Description:
Provide a way for users to initialize the sparkR daemon before it forks.
I'm a contractor to Target, where we have several projects doing ML with
sparkR. The changes proposed here results in weeks of compute-time saved with
every run.
(40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply
in our app) / 60 / 60 = 111 hours.
(from
[docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
h3. Daemon Initialization
If your worker function has a lengthy initialization, and your
application has lots of partitions, you may find you are spending weeks
of compute time repeatedly doing something that should have taken a few
seconds during daemon initialization.
Every Spark executor spawns a process running an R daemon. The daemon
"forks a copy" of itself whenever Spark finds work for it to do. It may
be applying a predefined method such as "max", or it may be applying
your worker function. SparkR::gapply arranges things so that your worker
function will be called with each group. A group is the pair
Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
every group found. With partitioning, the daemon will fork for every
partition found. A partition may have several groups in it.
All the initializations and library loading your worker function manages
is thrown away when the fork concludes. Every fork has to be
initialized.
The configuration spark.r.daemonInit provides a way to avoid reloading
packages every time the daemon forks by having the daemon pre-load
packages. You do this by providing R code to initialize the daemon for
your application.
h4. Examples
Suppose we want library(wow) to be pre-loaded for our workers.
{{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
of course, that would only work if we knew that library(wow) was on our
path and available on the executor. If we have to ship the library, we
can use YARN
sparkR.session(
master = 'yarn',
spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths()));
library(wow)',
spark.submit.deployMode = 'client',
spark.yarn.dist.archives = 'wow.zip#wowTarget')
YARN creates a directory for the new executor, unzips 'wow.zip' in some
other directory, and then provides a symlink to it called
./wowTarget. When the executor starts the daemon, the daemon loads
library(wow) from the newly created wowTarget.
Warning: if your initialization takes longer than 10 seconds, consider
increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr).
was:
Provide a way for users to initialize the sparkR daemon before it forks.
I'm a contractor to Target, where we have several projects doing ML with
sparkR. The changes proposed here results in weeks of compute-time saved with
every run.
(40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply
in our app) / 60 / 60 = 111 hours.
(from
[docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
h3. Daemon Initialization
If your worker function has a lengthy initialization, and your
application has lots of partitions, you may find you are spending weeks
of compute time repeatedly doing something that should have taken a few
seconds during daemon initialization.
Every Spark executor spawns a process running an R daemon. The daemon
"forks a copy" of itself whenever Spark finds work for it to do. It may
be applying a predefined method such as "max", or it may be applying
your worker function. SparkR::gapply arranges things so that your worker
function will be called with each group. A group is the pair
Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
every group found. With partitioning, the daemon will fork for every
partition found. A partition may have several groups in it.
All the initializations and library loading your worker function manages
is thrown away when the fork concludes. Every fork has to be
initialized.
The configuration spark.r.daemonInit provides a way to avoid reloading
packages every time the daemon forks by having the daemon pre-load
packages. You do this by providing R code to initialize the daemon for
your application.
h4. Examples
Suppose we want library(wow) to be pre-loaded for our workers.
{{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
of course, that would only work if we knew that library(wow) was on our
path and available on the executor. If we have to ship the library, we
can use YARN
{{sparkR.session(
master = 'yarn',
spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths())); library(wow)',
spark.submit.deployMode = 'client',
spark.yarn.dist.archives = 'wow.zip#wowTarget')
}}
YARN creates a directory for the new executor, unzips 'wow.zip' in some
other directory, and then provides a symlink to it called
./wowTarget. When the executor starts the daemon, the daemon loads
library(wow) from the newly created wowTarget.
Warning: if your initialization takes longer than 10 seconds, consider
increasing the configuration [spark.r.daemonTimeout](configuration.md#sparkr).
> SparkR Daemon Initialization
> ----------------------------
>
> Key: SPARK-34033
> URL: https://issues.apache.org/jira/browse/SPARK-34033
> Project: Spark
> Issue Type: Improvement
> Components: R, SparkR
> Affects Versions: 3.2.0
> Environment: tested on centos 7 & spark 2.3.1 and on my mac & spark
> at master
> Reporter: Tom Howland
> Priority: Major
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> Provide a way for users to initialize the sparkR daemon before it forks.
> I'm a contractor to Target, where we have several projects doing ML with
> sparkR. The changes proposed here results in weeks of compute-time saved with
> every run.
> (40000 partitions) * (5 seconds to load our R libraries) * (2 calls to gapply
> in our app) / 60 / 60 = 111 hours.
> (from
> [docs/sparkr.md|https://github.com/WamBamBoozle/spark/blob/daemon_init/docs/sparkr.md#daemon-initialization])
> h3. Daemon Initialization
> If your worker function has a lengthy initialization, and your
> application has lots of partitions, you may find you are spending weeks
> of compute time repeatedly doing something that should have taken a few
> seconds during daemon initialization.
> Every Spark executor spawns a process running an R daemon. The daemon
> "forks a copy" of itself whenever Spark finds work for it to do. It may
> be applying a predefined method such as "max", or it may be applying
> your worker function. SparkR::gapply arranges things so that your worker
> function will be called with each group. A group is the pair
> Key-Seq[Row]. In the absence of partitioning, the daemon will fork for
> every group found. With partitioning, the daemon will fork for every
> partition found. A partition may have several groups in it.
> All the initializations and library loading your worker function manages
> is thrown away when the fork concludes. Every fork has to be
> initialized.
> The configuration spark.r.daemonInit provides a way to avoid reloading
> packages every time the daemon forks by having the daemon pre-load
> packages. You do this by providing R code to initialize the daemon for
> your application.
> h4. Examples
> Suppose we want library(wow) to be pre-loaded for our workers.
> {{sparkR.session(spark.r.daemonInit = 'library(wow)')}}
> of course, that would only work if we knew that library(wow) was on our
> path and available on the executor. If we have to ship the library, we
> can use YARN
> sparkR.session(
> master = 'yarn',
> spark.r.daemonInit = '.libPaths(c("wowTarget", .libPaths()));
> library(wow)',
> spark.submit.deployMode = 'client',
> spark.yarn.dist.archives = 'wow.zip#wowTarget')
> YARN creates a directory for the new executor, unzips 'wow.zip' in some
> other directory, and then provides a symlink to it called
> ./wowTarget. When the executor starts the daemon, the daemon loads
> library(wow) from the newly created wowTarget.
> Warning: if your initialization takes longer than 10 seconds, consider
> increasing the configuration
> [spark.r.daemonTimeout](configuration.md#sparkr).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]