In a sentence, is this the idea of collecting an RDD to memory on each executor directly?
On Sun, Mar 22, 2015 at 10:56 PM, Sandy Ryza <sandy.r...@cloudera.com> wrote: > Hi Guillaume, > > I've long thought something like this would be useful - i.e. the ability to > broadcast RDDs directly without first pulling data through the driver. If I > understand correctly, your requirement to "block" a matrix up and only fetch > the needed parts could be implemented on top of this by splitting an RDD > into a set of smaller RDDs and then broadcasting each one on its own. > > Unfortunately nobody is working on this currently (and I couldn't promise to > have bandwidth to review it at the moment either), but I suspect we'll > eventually need to add something like this for map joins in Hive on Spark > and Spark SQL. > > -Sandy > > > > On Sat, Mar 21, 2015 at 3:11 AM, Guillaume Pitel > <guillaume.pi...@exensa.com> wrote: >> >> Hi, >> >> Thanks for your answer. This is precisely the use case I'm interested in, >> but I know it already, I should have mentionned it. Unfortunately this >> implementation of BlockMatrix has (in my opinion) some disadvantages (the >> fact that it split the matrix by range instead of using a modulo is bad for >> block skewness). Besides, and more importantly, as I was writing, it uses >> the join solution (actually a cogroup : >> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala, >> line 361). The reduplication of the elements of the dense matrix is thus >> dependent on the block size. >> >> Actually I'm wondering if what I want to achieve could be made with a >> simple modification to the join, allowing a partition to be weakly cached >> wafter being retrieved. >> >> Guillaume >> >> >> There is block matrix in Spark 1.3 - >> http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix >> >> >> >> >> >> However I believe it only supports dense matrix blocks. >> >> >> >> >> Still, might be possible to use it or exetend >> >> >> >> >> JIRAs: >> >> >> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434 >> >> >> >> >> >> Was based on >> >> >> https://github.com/amplab/ml-matrix >> >> >> >> >> >> Another lib: >> >> >> https://github.com/PasaLab/marlin/blob/master/README.md >> >> >> >> >> >> >> >> — >> Sent from Mailbox >> >> On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel >> <guillaume.pi...@exensa.com> wrote: >> >> Hi, >> I have an idea that I would like to discuss with the Spark devs. The >> idea comes from a very real problem that I have struggled with since >> almost a year. My problem is very simple, it's a dense matrix * sparse >> matrix operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is >> divided in X large blocks (one block per partition), and a sparse matrix >> RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The >> most efficient way to perform the operation is to collectAsMap() the >> dense matrix and broadcast it, then perform the block-local >> mutliplications, and combine the results by column. >> This is quite fine, unless the matrix is too big to fit in memory >> (especially since the multiplication is performed several times >> iteratively, and the broadcasts are not always cleaned from memory as I >> would naively expect). >> When the dense matrix is too big, a second solution is to split the big >> sparse matrix in several RDD, and do several broadcasts. Doing this >> creates quite a big overhead, but it mostly works, even though I often >> face some problems with unaccessible broadcast files, for instance. >> Then there is the terrible but apparently very effective good old join. >> Since X blocks of the sparse matrix use the same block from the dense >> matrix, I suspect that the dense matrix is somehow replicated X times >> (either on disk or in the network), which is the reason why the join >> takes so much time. >> After this bit of a context, here is my idea : would it be possible to >> somehow "broadcast" (or maybe more accurately, share or serve) a >> persisted RDD which is distributed on all workers, in a way that would, >> a bit like the IndexedRDD, allow a task to access a partition or an >> element of a partition in the closure, with a worker-local memory cache >> . i.e. the information about where each block resides would be >> distributed on the workers, to allow them to access parts of the RDD >> directly. I think that's already a bit how RDD are shuffled ? >> The RDD could stay distributed (no need to collect then broadcast), and >> only necessary transfers would be required. >> Is this a bad idea, is it already implemented somewhere (I would love it >> !) ?or is it something that could add efficiency not only for my use >> case, but maybe for others ? Could someone give me some hint about how I >> could add this possibility to Spark ? I would probably try to extend a >> RDD into a specific SharedIndexedRDD with a special lookup that would be >> allowed from tasks as a special case, and that would try to contact the >> blockManager and reach the corresponding data from the right worker. >> Thanks in advance for your advices >> Guillaume >> -- >> eXenSa >> >> *Guillaume PITEL, Président* >> +33(0)626 222 431 >> eXenSa S.A.S. <http://www.exensa.com/> >> 41, rue Périer - 92120 Montrouge - FRANCE >> Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >> >> >> >> -- >> Guillaume PITEL, Président >> +33(0)626 222 431 >> >> eXenSa S.A.S. >> 41, rue Périer - 92120 Montrouge - FRANCE >> Tel +33(0)184 163 677 / Fax +33(0)972 283 705 > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org