Hi,

Thanks for your answer. This is precisely the use case I'm interested in, but I know it already, I should have mentionned it. Unfortunately this implementation of BlockMatrix has (in my opinion) some disadvantages (the fact that it split the matrix by range instead of using a modulo is bad for block skewness). Besides, and more importantly, as I was writing, it uses the join solution (actually a cogroup : https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala, line 361). The reduplication of the elements of the dense matrix is thus dependent on the block size.

Actually I'm wondering if what I want to achieve could be made with a simple modification to the join, allowing a partition to be weakly cached wafter being retrieved.

Guillaume


There is block matrix in Spark 1.3 - 
http://spark.apache.org/docs/latest/mllib-data-types.html#blockmatrix





However I believe it only supports dense matrix blocks.




Still, might be possible to use it or exetend




JIRAs:


https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3434





Was based on


https://github.com/amplab/ml-matrix





Another lib:


https://github.com/PasaLab/marlin/blob/master/README.md







—
Sent from Mailbox

On Sat, Mar 21, 2015 at 12:24 AM, Guillaume Pitel
<guillaume.pi...@exensa.com> wrote:

Hi,
I have an idea that I would like to discuss with the Spark devs. The
idea comes from a very real problem that I have struggled with since
almost a year. My problem is very simple, it's a dense matrix * sparse
matrix  operation. I have a dense matrix RDD[(Int,FloatMatrix)] which is
divided in X large blocks (one block per partition), and a sparse matrix
RDD[((Int,Int),Array[Array[(Int,Float)]]] , divided in X * Y blocks. The
most efficient way to perform the operation is to collectAsMap() the
dense matrix and broadcast it, then perform the block-local
mutliplications, and combine the results by column.
This is quite fine, unless the matrix is too big to fit in memory
(especially since the multiplication is performed several times
iteratively, and the broadcasts are not always cleaned from memory as I
would naively expect).
When the dense matrix is too big, a second solution is to split the big
sparse matrix in several RDD, and do several broadcasts. Doing this
creates quite a big overhead, but it mostly works, even though I often
face some problems with unaccessible broadcast files, for instance.
Then there is the terrible but apparently very effective good old join.
Since X blocks of the sparse matrix use the same block from the dense
matrix, I suspect that the dense matrix is somehow replicated X times
(either on disk or in the network), which is the reason why the join
takes so much time.
After this bit of a context, here is my idea : would it be possible to
somehow "broadcast" (or maybe more accurately, share or serve) a
persisted RDD which is distributed on all workers, in a way that would,
a bit like the IndexedRDD, allow a task to access a partition or an
element of a partition in the closure, with a worker-local memory cache
. i.e. the information about where each block resides would be
distributed on the workers, to allow them to access parts of the RDD
directly. I think that's already a bit how RDD are shuffled ?
The RDD could stay distributed (no need to collect then broadcast), and
only necessary transfers would be required.
Is this a bad idea, is it already implemented somewhere (I would love it
!) ?or is it something that could add efficiency not only for my use
case, but maybe for others ? Could someone give me some hint about how I
could add this possibility to Spark ? I would probably try to extend a
RDD into a specific SharedIndexedRDD with a special lookup that would be
allowed from tasks as a special case, and that would try to contact the
blockManager and reach the corresponding data from the right worker.
Thanks in advance for your advices
Guillaume
--
eXenSa
        
*Guillaume PITEL, Président*
+33(0)626 222 431
eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705


--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to