[
https://issues.apache.org/jira/browse/FLINK-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653562#comment-15653562
]
ASF GitHub Bot commented on FLINK-4613:
---------------------------------------
Github user gaborhermann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2542#discussion_r87355415
--- Diff:
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
---
@@ -675,7 +756,69 @@ object ALS {
collector.collect((blockID, array))
}
}
- }.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+ }
+
+ // broadcasting XtX matrix in the implicit case
+ val updatedFactorMatrix = if (implicitPrefs) {
+ newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+ } else {
+ newMatrix
+ }
+
+
updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+ }
+
+ /**
+ * Computes the XtX matrix for the implicit version before updating the
factors.
+ * This matrix is intended to be broadcast, but as we cannot use a sink
inside a Flink
+ * iteration, so we represent it as a [[DataSet]] with a single element
containing the matrix.
+ *
+ * The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+ * then sums all these computed matrices to get `X^T * X`.
+ */
+ private[recommendation] def computeXtX(x: DataSet[(Int,
Array[Array[Double]])], factors: Int):
+ DataSet[Array[Double]] = {
+ val triangleSize = factors * (factors - 1) / 2 + factors
+
+ type MtxBlock = (Int, Array[Array[Double]])
+ // construct XtX for all blocks
+ val xtx = x
+ .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+ var xtxForBlock: Array[Double] = null
+
+ override def mapPartition(blocks: Iterable[(Int,
Array[Array[Double]])],
+ out: Collector[Array[Double]]): Unit = {
+
+ if (xtxForBlock == null) {
+ // creating the matrix if not yet created
+ xtxForBlock = Array.fill(triangleSize)(0.0)
+ } else {
+ // erasing the matrix
+ var i = 0
+ while (i < xtxForBlock.length) {
--- End diff --
I tried to avoid object creation, but I'm not sure if erasing works as
well. By using `fill` a new `factors * factors` matrix would be created at
every mapping. Am I right? Maybe that's not a big problem, as there is only one
mapping for every partition, and the matrix is not that big. Maybe it was just
premature optimization :) We could use fill, because that make the code
cleaner. What do you think?
> Extend ALS to handle implicit feedback datasets
> -----------------------------------------------
>
> Key: FLINK-4613
> URL: https://issues.apache.org/jira/browse/FLINK-4613
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Gábor Hermann
> Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle
> _implicit feedback_ datasets. These datasets do not contain explicit ratings
> by users, they are rather built by collecting user behavior (e.g. user
> listened to artist X for Y minutes), and they require a slightly different
> optimization objective. See details by [Hu et
> al|http://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS
> implementation|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
> which could be a basis for this extension. Only the updating factor part is
> modified, and most of the changes are in the local parts of the algorithm
> (i.e. UDFs). In fact, the only modification that is not local, is
> precomputing a matrix product Y^T * Y and broadcasting it to all the nodes,
> which we can do with broadcast DataSets.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)