[
https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966755#comment-14966755
]
ASF GitHub Bot commented on MAHOUT-1570:
----------------------------------------
Github user tillrohrmann commented on the pull request:
https://github.com/apache/mahout/pull/161#issuecomment-149884474
The problem of the `OutOfMemoryError` in `dals` of the
`DistributedDecompositionSuite` is the following:
In order parallelize the input matrix, the `FlinkEngine` creates a
`Collection[DrmTuple[Int]]` which is given to Flink's `CollectionInputFormat`.
The `CollectionInputFormat` is simply a wrapper for a collection which is
serialized and shipped to the corresponding data source at runtime. The problem
is that the matrix rows in the collection of `DrmTuple[Int]` are of type
`MatrixVectorView`. Instances of this class hold a reference to the original
matrix. Therefore, serializing an instance of `MatrixVectorView` effectively
boils down to serializing the complete input matrix for each row. This means
that the `CollectionInputFormat` needs memory of order `size(inputMatrix) *
rows(inputMatrix)` to serialize the given collection of `DrmTuple[Int]`
instances.
The input size for the dals test case are `500 * 500`. This means that the
input matrix has roughly the size of 2 MB. The serialized
`CollectionInputFormat` has then a size of roughly 1 GB, because we serialize
the matrix for each row. If you replace `FlinkEngine.scala:231` with `val rows
= (0 until m.nrow).map(i => (i, dvec(m(i, ::)).asInstanceOf[Vector]))`, the
test will pass the serialization point (but fail with another error :-( The
reason why 1 GB of memory, given that you set the max limit to 4 GB, is already
enough to crash Flink is that Flink reserver per default 70% of the available
memory for its managed memory (for sorting and hashing).
I think it would make sense to convert possible `Views` to its materialized
form before distributing the data in the cluster. That way, you avoid the
creation of unnecessary data which might be ok for the local use case where the
data is shared.
I hope this sheds a little bit of light in the `OutOfMemoryError` you've
encountered.
> Adding support for Apache Flink as a backend for the Mahout DSL
> ---------------------------------------------------------------
>
> Key: MAHOUT-1570
> URL: https://issues.apache.org/jira/browse/MAHOUT-1570
> Project: Mahout
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Alexey Grigorev
> Labels: DSL, flink, scala
> Fix For: 0.11.1
>
>
> With the finalized abstraction of the Mahout DSL plans from the backend
> operations (MAHOUT-1529), it should be possible to integrate further backends
> for the Mahout DSL. Apache Flink would be a suitable candidate to act as a
> good execution backend.
> With respect to the implementation, the biggest difference between Spark and
> Flink at the moment is probably the incremental rollout of plans, which is
> triggered by Spark's actions and which is not supported by Flink yet.
> However, the Flink community is working on this issue. For the moment, it
> should be possible to circumvent this problem by writing intermediate results
> required by an action to HDFS and reading from there.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)