Below are the average timings for one iteration of model update with RDD (with
cache, as Shivaram suggested):
Model size, RDD[Double].count / time, s
10M 0.585336926
100M 1.767947506
1B 125.6078817
There is a ~100x increase in time while 10x increase in model size (from 100
million to 1 billion of Double). More than half of the time is spent in GC, and
this time varies heavily. Two questions:
1)Can I use project Tungsten’s unsafe? Actually, can I reduce the GC time if I
use DataFrame instead of RDD and set the Tungsten key:
spark.sql.unsafe.enabled=true ?
2) RDD[Double] of one billion elements is 26.1GB persisted (as Spark UI shows).
It is around 26 bytes per element. How many bytes is RDD overhead?
The code:
val modelSize = 1000000000
val numIterations = 10
val parallelism = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelism).map(x => 0.1).cache
var newRDD = sc.parallelize(1 to 1, parallelism).map(x => 0.1)
var i = 0
var avgTime = 0.0
while (i < numIterations) {
val t = System.nanoTime()
val newRDD = oldRDD.map(x => x * x)
newRDD.cache
newRDD.count()
oldRDD.unpersist(true)
newRDD.mean
avgTime += (System.nanoTime() - t) / 1e9
oldRDD = newRDD
i += 1
}
println("Avg iteration time:" + avgTime / numIterations)
Best regards, Alexander
From: Shivaram Venkataraman [mailto:[email protected]]
Sent: Friday, July 10, 2015 10:04 PM
To: Ulanov, Alexander
Cc: <[email protected]>; [email protected]
Subject: Re: Model parallelism with RDD
Yeah I can see that being the case -- caching implies creating objects that
will be stored in memory. So there is a trade-off between storing data in
memory but having to garbage collect it later vs. recomputing the data.
Shivaram
On Fri, Jul 10, 2015 at 9:49 PM, Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Hi Shivaram,
Thank you for suggestion! If I do .cache and .count, each iteration take much
more time, which is spent in GC. Is it normal?
10 июля 2015 г., в 21:23, Shivaram Venkataraman
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>
написал(а):
I think you need to do `newRDD.cache()` and `newRDD.count` before you do
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous
iterations each time.
Thanks
Shivaram
On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>
wrote:
Hi,
I am interested how scalable can be the model parallelism within Spark.
Suppose, the model contains N weights of type Double and N is so large that
does not fit into the memory of a single node. So, we can store the model in
RDD[Double] within several nodes. To train the model, one needs to perform K
iterations that update all the weights and check the convergence. Then we also
need to exchange some weights between the nodes to synchronize the model or
update the global state. I’ve sketched the code that does iterative updates
with RDD (without global update yet). Surprisingly, each iteration takes more
time than previous as shown below (time in seconds). Could you suggest what is
the reason for that? I’ve checked GC, it does something within few milliseconds.
Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon
2.2, 16GB RAM each
Iteration 0 time:1.127990986
Iteration 1 time:1.391120414
Iteration 2 time:1.6429691381000002
Iteration 3 time:1.9344402954
Iteration 4 time:2.2075294246999997
Iteration 5 time:2.6328659593
Iteration 6 time:2.7911690492999996
Iteration 7 time:3.0850374104
Iteration 8 time:3.4031050061
Iteration 9 time:3.8826580919
Code:
val modelSize = 1000000000
val numIterations = 10
val parallelizm = 5
var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1)
var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1)
var i = 0
while (i < numIterations) {
val t = System.nanoTime()
// updating the weights
val newRDD = oldRDD.map(x => x * x)
oldRDD.unpersist(true)
// “checking” convergence
newRDD.mean
println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 /
numIterations)
oldRDD = newRDD
i += 1
}
Best regards, Alexander