Hi DB Tsai,

Thank you for your suggestion. Actually, I've started my experiments with 
"treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script exactly 
because MLlib optimizers are using it, as you pointed out with LBFGS. However, 
it leads to the same problems as "reduce", but presumably not so directly. As 
far as I understand, treeReduce limits the number of communications between 
workers and master forcing workers to partially compute the reduce operation.

Are you sure that driver will first collect all results (or all partial results 
in treeReduce) and ONLY then perform aggregation? If that is the problem, then 
how to force it to do aggregation after receiving each portion of data from 
Workers?

Best regards, Alexander

-----Original Message-----
From: DB Tsai [mailto:dbt...@dbtsai.com] 
Sent: Friday, January 23, 2015 11:53 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Maximum size of vector that reduce can handle

Hi Alexander,

When you use `reduce` to aggregate the vectors, those will actually be pulled 
into driver, and merged over there. Obviously, it's not scaleable given you are 
doing deep neural networks which have so many coefficients.

Please try treeReduce instead which is what we do in linear regression and 
logistic regression.

See 
https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
for example.

val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp 
= (c, v) => (c, v) match { case ((grad, loss), (label, features)) => val l = 
localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, 
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) => 
axpy(1.0, grad2, grad1) (grad1, loss1 + loss2)
})

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai



On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <alexander.ula...@hp.com> 
wrote:
> Dear Spark developers,
>
> I am trying to measure the Spark reduce performance for big vectors. My 
> motivation is related to machine learning gradient. Gradient is a vector that 
> is computed on each worker and then all results need to be summed up and 
> broadcasted back to workers. For example, present machine learning 
> applications involve very long parameter vectors, for deep neural networks it 
> can be up to 2Billions. So, I want to measure the time that is needed for 
> this operation depending on the size of vector and number of workers. I wrote 
> few lines of code that assume that Spark will distribute partitions among all 
> available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), 
> each runs 2 Workers.
>
> import org.apache.spark.mllib.rdd.RDDFunctions._
> import breeze.linalg._
> import org.apache.log4j._
> Logger.getRootLogger.setLevel(Level.OFF)
> val n = 60000000
> val p = 12
> val vv = sc.parallelize(0 until p, p).map(i => 
> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
>
> When executing in shell with 60M vector it crashes after some period of time. 
> One of the node contains the following in stdout:
> Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
> os::commit_memory(0x0000000755500000, 2863661056, 0) failed; 
> error='Cannot allocate memory' (errno=12) # # There is insufficient memory 
> for the Java Runtime Environment to continue.
> # Native memory allocation (malloc) failed to allocate 2863661056 bytes for 
> committing reserved memory.
>
> I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M 
> vector of Double should not be a problem. Are there any big overheads for 
> this? What is the maximum size of vector that reduce can handle?
>
> Best regards, Alexander
>
> P.S.
>
> "spark.driver.maxResultSize 0" needs to set in order to run this code. I also 
> needed to change "java.io.tmpdir" and "spark.local.dir" folders because my 
> /tmp folder which is default, was too small and Spark swaps heavily into this 
> folder. Without these settings I get either "no space left on device" or "out 
> of memory" exceptions.
>
> I also submitted a bug 
> https://issues.apache.org/jira/browse/SPARK-5386
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For 
> additional commands, e-mail: dev-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to