60m-vector costs 480MB memory. You have 12 of them to be reduced to the
driver. So you need ~6GB memory not counting the temp vectors generated
from '_+_'. You need to increase driver memory to make it work. That being
said, ~10^7 hits the limit for the current impl of glm. -Xiangrui
On Jan 23, 2015 2:19 PM, "DB Tsai" <dbt...@dbtsai.com> wrote:

> Hi Alexander,
>
> For `reduce`, it's an action that will collect all the data from
> mapper to driver, and perform the aggregation in driver. As a result,
> if the output from the mapper is very large, and the numbers of
> partitions in mapper are large, it might cause a problem.
>
> For `treeReduce`, as the name indicates, the way it works is in the
> first layer, it aggregates the output of the mappers two by two
> resulting half of the numbers of output. And then, we continuously do
> the aggregation layer by layer. The final aggregation will be done in
> driver but in this time, the numbers of data are small.
>
> By default, depth 2 is used, so if you have so many partitions of
> large vector, this may still cause issue. You can increase the depth
> into higher numbers such that in the final reduce in driver, the
> number of partitions are very small.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
>
> On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
> <alexander.ula...@hp.com> wrote:
> > Hi DB Tsai,
> >
> > Thank you for your suggestion. Actually, I've started my experiments
> with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script
> exactly because MLlib optimizers are using it, as you pointed out with
> LBFGS. However, it leads to the same problems as "reduce", but presumably
> not so directly. As far as I understand, treeReduce limits the number of
> communications between workers and master forcing workers to partially
> compute the reduce operation.
> >
> > Are you sure that driver will first collect all results (or all partial
> results in treeReduce) and ONLY then perform aggregation? If that is the
> problem, then how to force it to do aggregation after receiving each
> portion of data from Workers?
> >
> > Best regards, Alexander
> >
> > -----Original Message-----
> > From: DB Tsai [mailto:dbt...@dbtsai.com]
> > Sent: Friday, January 23, 2015 11:53 AM
> > To: Ulanov, Alexander
> > Cc: dev@spark.apache.org
> > Subject: Re: Maximum size of vector that reduce can handle
> >
> > Hi Alexander,
> >
> > When you use `reduce` to aggregate the vectors, those will actually be
> pulled into driver, and merged over there. Obviously, it's not scaleable
> given you are doing deep neural networks which have so many coefficients.
> >
> > Please try treeReduce instead which is what we do in linear regression
> and logistic regression.
> >
> > See
> https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
> > for example.
> >
> > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n),
> 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> features)) => val l = localGradient.compute( features, label, bcW.value,
> grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case
> ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 +
> loss2)
> > })
> >
> > Sincerely,
> >
> > DB Tsai
> > -------------------------------------------------------
> > Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> >
> > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
> >> Dear Spark developers,
> >>
> >> I am trying to measure the Spark reduce performance for big vectors. My
> motivation is related to machine learning gradient. Gradient is a vector
> that is computed on each worker and then all results need to be summed up
> and broadcasted back to workers. For example, present machine learning
> applications involve very long parameter vectors, for deep neural networks
> it can be up to 2Billions. So, I want to measure the time that is needed
> for this operation depending on the size of vector and number of workers. I
> wrote few lines of code that assume that Spark will distribute partitions
> among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores,
> 16GB RAM), each runs 2 Workers.
> >>
> >> import org.apache.spark.mllib.rdd.RDDFunctions._
> >> import breeze.linalg._
> >> import org.apache.log4j._
> >> Logger.getRootLogger.setLevel(Level.OFF)
> >> val n = 60000000
> >> val p = 12
> >> val vv = sc.parallelize(0 until p, p).map(i =>
> >> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
> >>
> >> When executing in shell with 60M vector it crashes after some period of
> time. One of the node contains the following in stdout:
> >> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
> >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed;
> >> error='Cannot allocate memory' (errno=12) # # There is insufficient
> memory for the Java Runtime Environment to continue.
> >> # Native memory allocation (malloc) failed to allocate 2863661056 bytes
> for committing reserved memory.
> >>
> >> I run shell with --executor-memory 8G --driver-memory 8G, so handling
> 60M vector of Double should not be a problem. Are there any big overheads
> for this? What is the maximum size of vector that reduce can handle?
> >>
> >> Best regards, Alexander
> >>
> >> P.S.
> >>
> >> "spark.driver.maxResultSize 0" needs to set in order to run this code.
> I also needed to change "java.io.tmpdir" and "spark.local.dir" folders
> because my /tmp folder which is default, was too small and Spark swaps
> heavily into this folder. Without these settings I get either "no space
> left on device" or "out of memory" exceptions.
> >>
> >> I also submitted a bug
> >> https://issues.apache.org/jira/browse/SPARK-5386
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For
> >> additional commands, e-mail: dev-h...@spark.apache.org
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>

Reply via email to