Yes, remember that your bandwidth is the maximum number of bytes per second that can be shipped to the driver. So if you've got 5 blocks that size, then it looks like you're basically saturating the network.
Aggregation trees help for many partitions/nodes and butterfly mixing can help use all of the network resources. I have seen implementations of butterfly mixing in spark but don't know if we've got one in mainline. Zhao and Canny's work [1] details this approach in the context of model fitting. At any rate, for this type of ANN work with huge models in *any* distributed setting, you're going to need to get faster networking (most production deployments I know of either have 10 gigabit Ethernet or 40 gigabit infiniband links), or figure out a way to decrease frequency or density of updates. Both would be even better. [1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf > On Oct 17, 2015, at 12:47 PM, Joseph Bradley <jos...@databricks.com> wrote: > > The decrease in running time from N=6 to N=7 makes some sense to me; that > should be when tree aggregation kicks in. I'd call it an improvement to run > in the same ~13sec increasing from N=6 to N=9. > > "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take > 5*3.1~15.5 seconds?" > --> I would guess so since all of that will be aggregated on the driver, but > I don't know enough about Spark's shuffling/networking to say for sure. > Others may be able to help more. > > Your numbers do make me wonder if we should examine the structure of the tree > aggregation more carefully and see if we can improve it. > https://issues.apache.org/jira/browse/SPARK-11168 > > Joseph > >> On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander >> <alexander.ula...@hpe.com> wrote: >> Hi Joseph, >> >> >> >> There seems to be no improvement if I run it with more partitions or bigger >> depth: >> >> N = 6 Avg time: 13.491579108666668 >> >> N = 7 Avg time: 8.929480508 >> >> N = 8 Avg time: 14.507123471999998 >> >> N= 9 Avg time: 13.854871645333333 >> >> >> >> Depth = 3 >> >> N=2 Avg time: 8.853895346333333 >> >> N=5 Avg time: 15.991574924666667 >> >> >> >> I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. >> So the transfer of 12M array of double message should take 64 * >> 12M/247M~3.1s. Does this mean that for 5 nodes with treeaggreate of depth 1 >> it will take 5*3.1~15.5 seconds? >> >> >> >> Best regards, Alexander >> >> From: Joseph Bradley [mailto:jos...@databricks.com] >> Sent: Wednesday, October 14, 2015 11:35 PM >> To: Ulanov, Alexander >> Cc: dev@spark.apache.org >> Subject: Re: Gradient Descent with large model size >> >> >> >> For those numbers of partitions, I don't think you'll actually use tree >> aggregation. The number of partitions needs to be over a certain threshold >> (>= 7) before treeAggregate really operates on a tree structure: >> >> https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100 >> >> >> >> Do you see a slower increase in running time with more partitions? For 5 >> partitions, do you find things improve if you tell treeAggregate to use >> depth > 2? >> >> >> >> Joseph >> >> >> >> On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander >> <alexander.ula...@hpe.com> wrote: >> >> Dear Spark developers, >> >> >> >> I have noticed that Gradient Descent is Spark MLlib takes long time if the >> model is large. It is implemented with TreeAggregate. I’ve extracted the >> code from GradientDescent.scala to perform the benchmark. It allocates the >> Array of a given size and the aggregates it: >> >> >> >> val dataSize = 12000000 >> >> val n = 5 >> >> val maxIterations = 3 >> >> val rdd = sc.parallelize(0 until n, n).cache() >> >> rdd.count() >> >> var avgTime = 0.0 >> >> for (i <- 1 to maxIterations) { >> >> val start = System.nanoTime() >> >> val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))( >> >> seqOp = (c, v) => { >> >> // c: (grad, loss, count) >> >> val l = 0.0 >> >> (c._1, c._2 + l, c._3 + 1) >> >> }, >> >> combOp = (c1, c2) => { >> >> // c: (grad, loss, count) >> >> (c1._1, c1._2 + c2._2, c1._3 + c2._3) >> >> }) >> >> avgTime += (System.nanoTime() - start) / 1e9 >> >> assert(result._1.length == dataSize) >> >> } >> >> println("Avg time: " + avgTime / maxIterations) >> >> >> >> If I run on my cluster of 1 master and 5 workers, I get the following >> results (given the array size = 12M): >> >> n = 1: Avg time: 4.555709667333333 >> >> n = 2 Avg time: 7.059724584666667 >> >> n = 3 Avg time: 9.937117377666667 >> >> n = 4 Avg time: 12.687526233 >> >> n = 5 Avg time: 12.939526129666667 >> >> >> >> Could you explain why the time becomes so big? The data transfer of 12M >> array of double should take ~ 1 second in 1Gbit network. There might be >> other overheads, however not that big as I observe. >> >> Best regards, Alexander >> >