[ 
https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14967623#comment-14967623
 ] 

ASF GitHub Bot commented on MAHOUT-1570:
----------------------------------------

Github user dlyubimov commented on the pull request:

    https://github.com/apache/mahout/pull/161#issuecomment-149987779
  
    On Wed, Oct 21, 2015 at 5:48 AM, Till Rohrmann <[email protected]>
    wrote:
    
    > The problem of the OutOfMemoryError in dals of the
    > DistributedDecompositionSuite is the following:
    >
    > In order parallelize the input matrix, the FlinkEngine creates a
    > Collection[DrmTuple[Int]] which is given to Flink's CollectionInputFormat.
    > The CollectionInputFormat is simply a wrapper for a collection which is
    > serialized and shipped to the corresponding data source at runtime. The
    > problem is that the matrix rows in the collection of DrmTuple[Int] are of
    > type MatrixVectorView. Instances of this class hold a reference to the
    > original matrix. Therefore, serializing an instance of MatrixVectorView
    > effectively boils down to serializing the complete input matrix for each
    > row.
    >
    This has never been a problem for other mappings and should not be here.
    What you serialize is either (1) collection of vectors, or (2) a matrix
    view.
    
    Java serialization of the mahout-math is not supported. so it has to be a
    custom serialization in each case. Mahout core provides two serialization
    means: (1) hadoop writable, (2) kryo.
    
    in particular, spark backend uses kryo serialization.
    
    In this case i guess we are speaking of collection of vectors, so the
    classes supprting these types of serialization would be VectorWritable and
    [1]. Kryo is generally a preferred way. We can move kryo support into
    math-scala out of spark package if needed.
    
    if neither kryo nor writable fits the bill, then additional thought is
    required. I remember vaguely talking to Stephan and he told me that
    supporting Writable serialization will suffice as a minimum requirement. So
    serializing vectors with either of these two ways should be acceptable
    regardless of actual implementation (view/non-view, doesn't matter. they
    serialize any Vector implementation).
    
    [1]
    
https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala
    
    > This means that the CollectionInputFormat needs memory of order 
size(inputMatrix)
    > * rows(inputMatrix) to serialize the given collection of DrmTuple[Int]
    > instances.
    >
    > The input size for the dals test case are 500 * 500. This means that the
    > input matrix has roughly the size of 2 MB. The serialized
    > CollectionInputFormat has then a size of roughly 1 GB, because we
    > serialize the matrix for each row. If you replace FlinkEngine.scala:231
    > with val rows = (0 until m.nrow).map(i => (i, dvec(m(i,
    > ::)).asInstanceOf[Vector])), the test will pass the serialization point
    > (but fail with another error :-( The reason why 1 GB of memory, given that
    > you set the max limit to 4 GB, is already enough to crash Flink is that
    > Flink reserver per default 70% of the available memory for its managed
    > memory (for sorting and hashing).
    >
    > I think it would make sense to convert possible Views to its materialized
    > form before distributing the data in the cluster. That way, you avoid the
    > creation of unnecessary data which might be ok for the local use case 
where
    > the data is shared.
    >
    > I hope this sheds a little bit of light in the OutOfMemoryError you've
    > encountered.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/mahout/pull/161#issuecomment-149884474>.
    >



> Adding support for Apache Flink as a backend for the Mahout DSL
> ---------------------------------------------------------------
>
>                 Key: MAHOUT-1570
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1570
>             Project: Mahout
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Alexey Grigorev
>              Labels: DSL, flink, scala
>             Fix For: 0.11.1
>
>
> With the finalized abstraction of the Mahout DSL plans from the backend 
> operations (MAHOUT-1529), it should be possible to integrate further backends 
> for the Mahout DSL. Apache Flink would be a suitable candidate to act as a 
> good execution backend. 
> With respect to the implementation, the biggest difference between Spark and 
> Flink at the moment is probably the incremental rollout of plans, which is 
> triggered by Spark's actions and which is not supported by Flink yet. 
> However, the Flink community is working on this issue. For the moment, it 
> should be possible to circumvent this problem by writing intermediate results 
> required by an action to HDFS and reading from there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to