Repository: flink
Updated Branches:
  refs/heads/master 31e157346 -> 215776b81


[FLINK-2211] [ml] Generalize ALS API

Allows the user and items to be of type Long

This closes #3265.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43d2fd23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43d2fd23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43d2fd23

Branch: refs/heads/master
Commit: 43d2fd23a75a5ac7769d37cb5c2559803bd65800
Parents: 31e1573
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 3 18:22:13 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 5 21:56:25 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/ml/recommendation/ALS.scala    | 125 +++++++++++++------
 .../flink/ml/recommendation/ALSITSuite.scala    |  51 +++++++-
 .../ml/recommendation/Recommendation.scala      |  63 ++++++++++
 3 files changed, 198 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43d2fd23/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index d8af42f..0454381 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -194,7 +194,7 @@ class ALS extends Predictor[ALS] {
     * @return
     */
   def empiricalRisk(
-      labeledData: DataSet[(Int, Int, Double)],
+      labeledData: DataSet[(Long, Long, Double)],
       riskParameters: ParameterMap = ParameterMap.Empty)
     : DataSet[Double] = {
     val resultingParameters = parameters ++ riskParameters
@@ -293,20 +293,20 @@ object ALS {
     * @param item Item iD of the rated item
     * @param rating Rating value
     */
-  case class Rating(user: Int, item: Int, rating: Double)
+  case class Rating(user: Long, item: Long, rating: Double)
 
   /** Latent factor model vector
     *
     * @param id
     * @param factors
     */
-  case class Factors(id: Int, factors: Array[Double]) {
+  case class Factors(id: Long, factors: Array[Double]) {
     override def toString = s"($id, ${factors.mkString(",")})"
   }
 
   case class Factorization(userFactors: DataSet[Factors], itemFactors: 
DataSet[Factors])
 
-  case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) {
+  case class OutBlockInformation(elementIDs: Array[Long], outLinks: OutLinks) {
     override def toString: String = {
       s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))"
     }
@@ -349,7 +349,7 @@ object ALS {
     def apply(idx: Int) = links(idx)
   }
 
-  case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: 
Array[BlockRating]) {
+  case class InBlockInformation(elementIDs: Array[Long], ratingsForBlock: 
Array[BlockRating]) {
 
     override def toString: String = {
       s"InBlockInformation:((${elementIDs.mkString(",")}), 
(${ratingsForBlock.mkString("\n")}))"
@@ -376,8 +376,8 @@ object ALS {
   }
 
   class BlockIDGenerator(blocks: Int) extends Serializable {
-    def apply(id: Int): Int = {
-      id % blocks
+    def apply(id: Long): Int = {
+      (id % blocks).toInt
     }
   }
 
@@ -390,12 +390,15 @@ object ALS {
   // ===================================== Operations 
==============================================
 
   /** Predict operation which calculates the matrix entry for the given 
indices  */
-  implicit val predictRating = new PredictDataSetOperation[ALS, (Int, Int), 
(Int ,Int, Double)] {
+  implicit val predictRating = new PredictDataSetOperation[
+      ALS,
+      (Long, Long),
+      (Long, Long, Double)] {
     override def predictDataSet(
         instance: ALS,
         predictParameters: ParameterMap,
-        input: DataSet[(Int, Int)])
-      : DataSet[(Int, Int, Double)] = {
+        input: DataSet[(Long, Long)])
+      : DataSet[(Long, Long, Double)] = {
 
       instance.factorsOption match {
         case Some((userFactors, itemFactors)) => {
@@ -425,16 +428,34 @@ object ALS {
     }
   }
 
+  implicit val predictRatingInt = new PredictDataSetOperation[ALS, (Int, Int), 
(Int, Int, Double)] {
+    override def predictDataSet(
+      instance: ALS,
+      predictParameters: ParameterMap,
+      input: DataSet[(Int, Int)])
+    : DataSet[(Int, Int, Double)] = {
+      val longInput = input.map { x => (x._1.toLong, x._2.toLong)}
+
+      val longResult = implicitly[PredictDataSetOperation[ALS, (Long, Long), 
(Long, Long, Double)]]
+        .predictDataSet(
+          instance,
+          predictParameters,
+          longInput)
+
+      longResult.map{ x => (x._1.toInt, x._2.toInt, x._3)}
+    }
+  }
+
   /** Calculates the matrix factorization for the given ratings. A rating is 
defined as
     * a tuple of user ID, item ID and the corresponding rating.
     *
     * @return Factorization containing the user and item matrix
     */
-  implicit val fitALS =  new FitOperation[ALS, (Int, Int, Double)] {
+  implicit val fitALS =  new FitOperation[ALS, (Long, Long, Double)] {
     override def fit(
         instance: ALS,
         fitParameters: ParameterMap,
-        input: DataSet[(Int, Int, Double)])
+        input: DataSet[(Long, Long, Double)])
       : Unit = {
       val resultParameters = instance.parameters ++ fitParameters
 
@@ -457,13 +478,13 @@ object ALS {
 
       val ratingsByUserBlock = ratings.map{
         rating =>
-          val blockID = rating.user % userBlocks
+          val blockID = (rating.user % userBlocks).toInt
           (blockID, rating)
       } partitionCustom(blockIDPartitioner, 0)
 
       val ratingsByItemBlock = ratings map {
         rating =>
-          val blockID = rating.item % itemBlocks
+          val blockID = (rating.item % itemBlocks).toInt
           (blockID, new Rating(rating.item, rating.user, rating.rating))
       } partitionCustom(blockIDPartitioner, 0)
 
@@ -518,6 +539,19 @@ object ALS {
     }
   }
 
+  implicit val fitALSInt =  new FitOperation[ALS, (Int, Int, Double)] {
+    override def fit(
+      instance: ALS,
+      fitParameters: ParameterMap,
+      input: DataSet[(Int, Int, Double)])
+    : Unit = {
+
+      val longInput = input.map { x => (x._1.toLong, x._2.toLong, x._3)}
+
+      implicitly[FitOperation[ALS, (Long, Long, Double)]].fit(instance, 
fitParameters, longInput)
+    }
+  }
+
   /** Calculates a single half step of the ALS optimization. The result is the 
new value for
     * either the user or item matrix, depending with which matrix the method 
was called.
     *
@@ -706,13 +740,13 @@ object ALS {
     * @param ratings
     * @return
     */
-  def createUsersPerBlock(ratings: DataSet[(Int, Rating)]): DataSet[(Int, 
Array[Int])] = {
+  def createUsersPerBlock(ratings: DataSet[(Int, Rating)]): DataSet[(Int, 
Array[Long])] = {
     ratings.map{ x => (x._1, x._2.user)}.withForwardedFields("0").groupBy(0).
       sortGroup(1, Order.ASCENDING).reduceGroup {
       users => {
-        val result = ArrayBuffer[Int]()
+        val result = ArrayBuffer[Long]()
         var id = -1
-        var oldUser = -1
+        var oldUser = -1L
 
         while(users.hasNext) {
           val user = users.next()
@@ -746,7 +780,7 @@ object ALS {
     * @return
     */
   def createOutBlockInformation(ratings: DataSet[(Int, Rating)],
-    usersPerBlock: DataSet[(Int, Array[Int])],
+    usersPerBlock: DataSet[(Int, Array[Long])],
     itemBlocks: Int, blockIDGenerator: BlockIDGenerator):
   DataSet[(Int, OutBlockInformation)] = {
     ratings.coGroup(usersPerBlock).where(0).equalTo(0).apply {
@@ -795,7 +829,7 @@ object ALS {
     * @return
     */
   def createInBlockInformation(ratings: DataSet[(Int, Rating)],
-    usersPerBlock: DataSet[(Int, Array[Int])],
+    usersPerBlock: DataSet[(Int, Array[Long])],
     blockIDGenerator: BlockIDGenerator):
   DataSet[(Int, InBlockInformation)] = {
     // Group for every user block the users which have rated the same item and 
collect their ratings
@@ -803,8 +837,8 @@ object ALS {
       .withForwardedFields("0").groupBy(0, 1).reduceGroup {
       x =>
         var userBlockID = -1
-        var itemID = -1
-        val userIDs = ArrayBuffer[Int]()
+        var itemID = -1L
+        val userIDs = ArrayBuffer[Long]()
         val ratings = ArrayBuffer[Double]()
 
         while (x.hasNext) {
@@ -824,12 +858,12 @@ object ALS {
     // accordingly.
     val collectedPartialInfos = partialInInfos.groupBy(0, 1).sortGroup(2, 
Order.ASCENDING).
       reduceGroup {
-      new GroupReduceFunction[(Int, Int, Int, (Array[Int], Array[Double])), 
(Int,
-        Int, Array[(Array[Int], Array[Double])])](){
-        val buffer = new ArrayBuffer[(Array[Int], Array[Double])]
+      new GroupReduceFunction[(Int, Int, Long, (Array[Long], Array[Double])), 
(Int,
+        Int, Array[(Array[Long], Array[Double])])](){
+        val buffer = new ArrayBuffer[(Array[Long], Array[Double])]
 
-        override def reduce(iterable: lang.Iterable[(Int, Int, Int, 
(Array[Int],
-          Array[Double]))], collector: Collector[(Int, Int, Array[(Array[Int],
+        override def reduce(iterable: lang.Iterable[(Int, Int, Long, 
(Array[Long],
+          Array[Double]))], collector: Collector[(Int, Int, Array[(Array[Long],
           Array[Double])])]): Unit = {
 
           val infos = iterable.iterator()
@@ -858,7 +892,7 @@ object ALS {
             counter += 1
           }
 
-          val array = new Array[(Array[Int], Array[Double])](counter)
+          val array = new Array[(Array[Long], Array[Double])](counter)
 
           buffer.copyToArray(array)
 
@@ -871,13 +905,13 @@ object ALS {
     // respect to their itemBlockID, because the block update messages are 
sorted the same way
     collectedPartialInfos.coGroup(usersPerBlock).where(0).equalTo(0).
       sortFirstGroup(1, Order.ASCENDING).apply{
-      new CoGroupFunction[(Int, Int, Array[(Array[Int], Array[Double])]),
-        (Int, Array[Int]), (Int, InBlockInformation)] {
+      new CoGroupFunction[(Int, Int, Array[(Array[Long], Array[Double])]),
+        (Int, Array[Long]), (Int, InBlockInformation)] {
         val buffer = ArrayBuffer[BlockRating]()
 
         override def coGroup(partialInfosIterable:
-        lang.Iterable[(Int, Int,  Array[(Array[Int], Array[Double])])],
-          userIterable: lang.Iterable[(Int, Array[Int])],
+        lang.Iterable[(Int, Int,  Array[(Array[Long], Array[Double])])],
+          userIterable: lang.Iterable[(Int, Array[Long])],
           collector: Collector[(Int, InBlockInformation)]): Unit = {
 
           val users = userIterable.iterator()
@@ -895,12 +929,21 @@ object ALS {
             // entry contains the ratings and userIDs of a complete item block
             val entry = partialInfo._3
 
+            val blockRelativeIndicesRatings = new Array[(Array[Int], 
Array[Double])](entry.size)
+
             // transform userIDs to positional indices
-            for (row <- 0 until entry.length; col <- 0 until 
entry(row)._1.length) {
-              entry(row)._1(col) = userIDToPos(entry(row)._1(col))
+            for (row <- 0 until entry.length) {
+              val rowEntries = entry(row)._1
+              val rowIndices = new Array[Int](rowEntries.length)
+
+              for (col <- 0 until rowEntries.length) {
+                rowIndices(col) = userIDToPos(rowEntries(col))
+              }
+
+              blockRelativeIndicesRatings(row) = (rowIndices, entry(row)._2)
             }
 
-            buffer(counter).ratings = entry
+            buffer(counter).ratings = blockRelativeIndicesRatings
 
             counter += 1
           }
@@ -909,13 +952,21 @@ object ALS {
             val partialInfo = partialInfos.next()
             // entry contains the ratings and userIDs of a complete item block
             val entry = partialInfo._3
+            val blockRelativeIndicesRatings = new Array[(Array[Int], 
Array[Double])](entry.size)
 
             // transform userIDs to positional indices
-            for (row <- 0 until entry.length; col <- 0 until 
entry(row)._1.length) {
-              entry(row)._1(col) = userIDToPos(entry(row)._1(col))
+            for (row <- 0 until entry.length) {
+              val rowEntries = entry(row)._1
+              val rowIndices = new Array[Int](rowEntries.length)
+
+              for (col <- 0 until rowEntries.length) {
+                rowIndices(col) = userIDToPos(rowEntries(col))
+              }
+
+              blockRelativeIndicesRatings(row) = (rowIndices, entry(row)._2)
             }
 
-            buffer += new BlockRating(entry)
+            buffer += new BlockRating(blockRelativeIndicesRatings)
 
             counter += 1
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/43d2fd23/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 043d8cb..0c85c46 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -42,19 +42,19 @@ class ALSITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
       .setBlocks(4)
       .setNumFactors(numFactors)
 
-    val inputDS = env.fromCollection(data)
+    val inputDS = env.fromCollection(dataLong)
 
     als.fit(inputDS)
 
-    val testData = env.fromCollection(expectedResult.map {
+    val testData = env.fromCollection(expectedResultLong.map {
       case (userID, itemID, rating) => (userID, itemID)
     })
 
     val predictions = als.predict(testData).collect()
 
-    predictions.length should equal(expectedResult.length)
+    predictions.length should equal(expectedResultLong.length)
 
-    val resultMap = expectedResult.map {
+    val resultMap = expectedResultLong.map {
       case (uID, iID, value) => (uID, iID) -> value
     }.toMap
 
@@ -70,4 +70,47 @@ class ALSITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
 
     risk should be(expectedEmpiricalRisk +- 1)
   }
+
+  it should "properly factorize a matrix (integer indices)" in {
+    import Recommendation._
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val als = ALS()
+      .setIterations(iterations)
+      .setLambda(lambda)
+      .setBlocks(4)
+      .setNumFactors(numFactors)
+
+    val inputDS = env.fromCollection(data)
+
+    als.fit(inputDS)
+
+
+    val testData = env.fromCollection(expectedResult.map {
+     case (userID, itemID, rating) => (userID, itemID)
+   })
+
+    val predictions = als.predict(testData).collect()
+
+    predictions.length should equal(expectedResult.length)
+
+    val resultMap = expectedResultLong.map {
+      case (uID, iID, value) => (uID, iID) -> value
+    }.toMap
+
+    predictions foreach {
+      case (uID, iID, value) => {
+        resultMap.isDefinedAt((uID, iID)) should be(true)
+
+        value should be(resultMap((uID, iID)) +- 0.1)
+      }
+    }
+
+    val risk = als.empiricalRisk(
+        inputDS.map( x => (x._1.toLong, x._2.toLong, x._3)))
+      .collect().head
+
+    risk should be(expectedEmpiricalRisk +- 1)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43d2fd23/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
index 8d8e4b9..3b466fd 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/Recommendation.scala
@@ -86,5 +86,68 @@ object Recommendation {
     )
   }
 
+  val dataLong: Seq[(Long, Long, Double)] = {
+    Seq(
+      (2,13,534.3937734561154),
+      (6,14,509.63176469621936),
+      (4,14,515.8246770897443),
+      (7,3,495.05234565105),
+      (2,3,532.3281786219485),
+      (5,3,497.1906356844367),
+      (3,3,512.0640508585093),
+      (10,3,500.2906742233019),
+      (1,4,521.9189079662882),
+      (2,4,515.0734651491396),
+      (1,7,522.7532725967008),
+      (8,4,492.65683825096403),
+      (4,8,492.65683825096403),
+      (10,8,507.03319667905413),
+      (7,1,522.7532725967008),
+      (1,1,572.2230209271174),
+      (2,1,563.5849190220224),
+      (6,1,518.4844061038742),
+      (9,1,529.2443732217674),
+      (8,1,543.3202505434103),
+      (7,2,516.0188923307859),
+      (1,2,563.5849190220224),
+      (1,11,515.1023793011227),
+      (8,2,536.8571133978352),
+      (2,11,507.90776961762225),
+      (3,2,532.3281786219485),
+      (5,11,476.24185144363304),
+      (4,2,515.0734651491396),
+      (4,11,469.92049343738233),
+      (3,12,509.4713776280098),
+      (4,12,494.6533165132021),
+      (7,5,482.2907867916308),
+      (6,5,477.5940040923741),
+      (4,5,480.9040684364228),
+      (1,6,518.4844061038742),
+      (6,6,470.6605085832807),
+      (8,6,489.6360564705307),
+      (4,6,472.74052954447046),
+      (7,9,482.5837650471611),
+      (5,9,487.00175463269863),
+      (9,9,500.69514584780944),
+      (4,9,477.71644808419325),
+      (7,10,485.3852917539852),
+      (8,10,507.03319667905413),
+      (3,10,500.2906742233019),
+      (5,15,488.08215944254437),
+      (6,15,480.16929757607346)
+    )
+  }
+
+  val expectedResultLong: Seq[(Long, Long, Double)] = {
+    Seq(
+      (2, 2, 526.1037),
+      (5, 9, 468.5680),
+      (10, 3, 484.8975),
+      (5, 13, 451.6228),
+      (1, 15, 493.4956),
+      (4, 11, 456.3862)
+    )
+  }
+
   val expectedEmpiricalRisk = 505374.1877
 }

Reply via email to