Yes, remember that your bandwidth is the maximum number of bytes per second 
that can be shipped to the driver. So if you've got 5 blocks that size, then it 
looks like you're basically saturating the network. 

Aggregation trees help for many partitions/nodes and butterfly mixing can help 
use all of the network resources. I have seen implementations of butterfly 
mixing in spark but don't know if we've got one in mainline. Zhao and Canny's 
work [1] details this approach in the context of model fitting. 

At any rate, for this type of ANN work with huge models in *any* distributed 
setting, you're going to need to get faster networking (most production 
deployments I know of either have 10 gigabit Ethernet or 40 gigabit infiniband 
links), or figure out a way to decrease frequency or density of updates. Both 
would be even better. 

[1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf

> On Oct 17, 2015, at 12:47 PM, Joseph Bradley <jos...@databricks.com> wrote:
> 
> The decrease in running time from N=6 to N=7 makes some sense to me; that 
> should be when tree aggregation kicks in.  I'd call it an improvement to run 
> in the same ~13sec increasing from N=6 to N=9.
> 
> "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 
> 5*3.1~15.5 seconds?"
> --> I would guess so since all of that will be aggregated on the driver, but 
> I don't know enough about Spark's shuffling/networking to say for sure.  
> Others may be able to help more.
> 
> Your numbers do make me wonder if we should examine the structure of the tree 
> aggregation more carefully and see if we can improve it.  
> https://issues.apache.org/jira/browse/SPARK-11168
> 
> Joseph
> 
>> On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander 
>> <alexander.ula...@hpe.com> wrote:
>> Hi Joseph,
>> 
>>  
>> 
>> There seems to be no improvement if I run it with more partitions or bigger 
>> depth:
>> 
>> N = 6 Avg time: 13.491579108666668
>> 
>> N = 7 Avg time: 8.929480508
>> 
>> N = 8 Avg time: 14.507123471999998
>> 
>> N= 9 Avg time: 13.854871645333333
>> 
>>  
>> 
>> Depth = 3
>> 
>> N=2 Avg time: 8.853895346333333
>> 
>> N=5 Avg time: 15.991574924666667
>> 
>>  
>> 
>> I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. 
>> So the transfer of 12M array of double message should take 64 * 
>> 12M/247M~3.1s. Does this mean that for 5 nodes with treeaggreate of depth 1 
>> it will take 5*3.1~15.5 seconds?
>> 
>>  
>> 
>> Best regards, Alexander
>> 
>> From: Joseph Bradley [mailto:jos...@databricks.com] 
>> Sent: Wednesday, October 14, 2015 11:35 PM
>> To: Ulanov, Alexander
>> Cc: dev@spark.apache.org
>> Subject: Re: Gradient Descent with large model size
>> 
>>  
>> 
>> For those numbers of partitions, I don't think you'll actually use tree 
>> aggregation.  The number of partitions needs to be over a certain threshold 
>> (>= 7) before treeAggregate really operates on a tree structure:
>> 
>> https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100
>> 
>>  
>> 
>> Do you see a slower increase in running time with more partitions?  For 5 
>> partitions, do you find things improve if you tell treeAggregate to use 
>> depth > 2?
>> 
>>  
>> 
>> Joseph
>> 
>>  
>> 
>> On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander 
>> <alexander.ula...@hpe.com> wrote:
>> 
>> Dear Spark developers,
>> 
>>  
>> 
>> I have noticed that Gradient Descent is Spark MLlib takes long time if the 
>> model is large. It is implemented with TreeAggregate. I’ve extracted the 
>> code from GradientDescent.scala to perform the benchmark. It allocates the 
>> Array of a given size and the aggregates it:
>> 
>>  
>> 
>> val dataSize = 12000000
>> 
>> val n = 5
>> 
>> val maxIterations = 3
>> 
>> val rdd = sc.parallelize(0 until n, n).cache()
>> 
>> rdd.count()
>> 
>> var avgTime = 0.0
>> 
>> for (i <- 1 to maxIterations) {
>> 
>>   val start = System.nanoTime()
>> 
>>   val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))(
>> 
>>         seqOp = (c, v) => {
>> 
>>           // c: (grad, loss, count)
>> 
>>           val l = 0.0
>> 
>>           (c._1, c._2 + l, c._3 + 1)
>> 
>>         },
>> 
>>         combOp = (c1, c2) => {
>> 
>>           // c: (grad, loss, count)
>> 
>>           (c1._1, c1._2 + c2._2, c1._3 + c2._3)
>> 
>>         })
>> 
>>   avgTime += (System.nanoTime() - start) / 1e9
>> 
>>   assert(result._1.length == dataSize)
>> 
>> }
>> 
>> println("Avg time: " + avgTime / maxIterations)
>> 
>>  
>> 
>> If I run on my cluster of 1 master and 5 workers, I get the following 
>> results (given the array size = 12M):
>> 
>> n = 1: Avg time: 4.555709667333333
>> 
>> n = 2 Avg time: 7.059724584666667
>> 
>> n = 3 Avg time: 9.937117377666667
>> 
>> n = 4 Avg time: 12.687526233
>> 
>> n = 5 Avg time: 12.939526129666667
>> 
>>  
>> 
>> Could you explain why the time becomes so big? The data transfer of 12M 
>> array of double should take ~ 1 second in 1Gbit network. There might be 
>> other overheads, however not that big as I observe.
>> 
>> Best regards, Alexander
>> 
> 

Reply via email to