[
https://issues.apache.org/jira/browse/MAHOUT-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14967623#comment-14967623
]
ASF GitHub Bot commented on MAHOUT-1570:
----------------------------------------
Github user dlyubimov commented on the pull request:
https://github.com/apache/mahout/pull/161#issuecomment-149987779
On Wed, Oct 21, 2015 at 5:48 AM, Till Rohrmann <[email protected]>
wrote:
> The problem of the OutOfMemoryError in dals of the
> DistributedDecompositionSuite is the following:
>
> In order parallelize the input matrix, the FlinkEngine creates a
> Collection[DrmTuple[Int]] which is given to Flink's CollectionInputFormat.
> The CollectionInputFormat is simply a wrapper for a collection which is
> serialized and shipped to the corresponding data source at runtime. The
> problem is that the matrix rows in the collection of DrmTuple[Int] are of
> type MatrixVectorView. Instances of this class hold a reference to the
> original matrix. Therefore, serializing an instance of MatrixVectorView
> effectively boils down to serializing the complete input matrix for each
> row.
>
This has never been a problem for other mappings and should not be here.
What you serialize is either (1) collection of vectors, or (2) a matrix
view.
Java serialization of the mahout-math is not supported. so it has to be a
custom serialization in each case. Mahout core provides two serialization
means: (1) hadoop writable, (2) kryo.
in particular, spark backend uses kryo serialization.
In this case i guess we are speaking of collection of vectors, so the
classes supprting these types of serialization would be VectorWritable and
[1]. Kryo is generally a preferred way. We can move kryo support into
math-scala out of spark package if needed.
if neither kryo nor writable fits the bill, then additional thought is
required. I remember vaguely talking to Stephan and he told me that
supporting Writable serialization will suffice as a minimum requirement. So
serializing vectors with either of these two ways should be acceptable
regardless of actual implementation (view/non-view, doesn't matter. they
serialize any Vector implementation).
[1]
https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala
> This means that the CollectionInputFormat needs memory of order
size(inputMatrix)
> * rows(inputMatrix) to serialize the given collection of DrmTuple[Int]
> instances.
>
> The input size for the dals test case are 500 * 500. This means that the
> input matrix has roughly the size of 2 MB. The serialized
> CollectionInputFormat has then a size of roughly 1 GB, because we
> serialize the matrix for each row. If you replace FlinkEngine.scala:231
> with val rows = (0 until m.nrow).map(i => (i, dvec(m(i,
> ::)).asInstanceOf[Vector])), the test will pass the serialization point
> (but fail with another error :-( The reason why 1 GB of memory, given that
> you set the max limit to 4 GB, is already enough to crash Flink is that
> Flink reserver per default 70% of the available memory for its managed
> memory (for sorting and hashing).
>
> I think it would make sense to convert possible Views to its materialized
> form before distributing the data in the cluster. That way, you avoid the
> creation of unnecessary data which might be ok for the local use case
where
> the data is shared.
>
> I hope this sheds a little bit of light in the OutOfMemoryError you've
> encountered.
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/mahout/pull/161#issuecomment-149884474>.
>
> Adding support for Apache Flink as a backend for the Mahout DSL
> ---------------------------------------------------------------
>
> Key: MAHOUT-1570
> URL: https://issues.apache.org/jira/browse/MAHOUT-1570
> Project: Mahout
> Issue Type: Improvement
> Reporter: Till Rohrmann
> Assignee: Alexey Grigorev
> Labels: DSL, flink, scala
> Fix For: 0.11.1
>
>
> With the finalized abstraction of the Mahout DSL plans from the backend
> operations (MAHOUT-1529), it should be possible to integrate further backends
> for the Mahout DSL. Apache Flink would be a suitable candidate to act as a
> good execution backend.
> With respect to the implementation, the biggest difference between Spark and
> Flink at the moment is probably the incremental rollout of plans, which is
> triggered by Spark's actions and which is not supported by Flink yet.
> However, the Flink community is working on this issue. For the moment, it
> should be possible to circumvent this problem by writing intermediate results
> required by an action to HDFS and reading from there.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)