[ 
https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966755#comment-14966755
 ] 

ASF GitHub Bot commented on MAHOUT-1570:
----------------------------------------

Github user tillrohrmann commented on the pull request:

    https://github.com/apache/mahout/pull/161#issuecomment-149884474
  
    The problem of the `OutOfMemoryError` in `dals` of the 
`DistributedDecompositionSuite` is the following: 
    
    In order parallelize the input matrix, the `FlinkEngine` creates a 
`Collection[DrmTuple[Int]]` which is given to Flink's `CollectionInputFormat`. 
The `CollectionInputFormat` is simply a wrapper for a collection which is 
serialized and shipped to the corresponding data source at runtime. The problem 
is that the matrix rows in the collection of `DrmTuple[Int]` are of type 
`MatrixVectorView`. Instances of this class hold a reference to the original 
matrix. Therefore, serializing an instance of `MatrixVectorView` effectively 
boils down to serializing the complete input matrix for each row. This means 
that the `CollectionInputFormat` needs memory of order `size(inputMatrix) * 
rows(inputMatrix)` to serialize the given collection of `DrmTuple[Int]` 
instances.
    
    The input size for the dals test case are `500 * 500`. This means that the 
input matrix has roughly the size of 2 MB. The serialized 
`CollectionInputFormat` has then a size of roughly 1 GB, because we serialize 
the matrix for each row. If you replace `FlinkEngine.scala:231` with `val rows 
= (0 until m.nrow).map(i => (i, dvec(m(i, ::)).asInstanceOf[Vector]))`, the 
test will pass the serialization point (but fail with another error :-( The 
reason why 1 GB of memory, given that you set the max limit to 4 GB, is already 
enough to crash Flink is that Flink reserver per default 70% of the available 
memory for its managed memory (for sorting and hashing).
    
    I think it would make sense to convert possible `Views` to its materialized 
form before distributing the data in the cluster. That way, you avoid the 
creation of unnecessary data which might be ok for the local use case where the 
data is shared.
    
    I hope this sheds a little bit of light in the `OutOfMemoryError` you've 
encountered. 


> Adding support for Apache Flink as a backend for the Mahout DSL
> ---------------------------------------------------------------
>
>                 Key: MAHOUT-1570
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1570
>             Project: Mahout
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Alexey Grigorev
>              Labels: DSL, flink, scala
>             Fix For: 0.11.1
>
>
> With the finalized abstraction of the Mahout DSL plans from the backend 
> operations (MAHOUT-1529), it should be possible to integrate further backends 
> for the Mahout DSL. Apache Flink would be a suitable candidate to act as a 
> good execution backend. 
> With respect to the implementation, the biggest difference between Spark and 
> Flink at the moment is probably the incremental rollout of plans, which is 
> triggered by Spark's actions and which is not supported by Flink yet. 
> However, the Flink community is working on this issue. For the moment, it 
> should be possible to circumvent this problem by writing intermediate results 
> required by an action to HDFS and reading from there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to