Repository: mahout
Updated Branches:
  refs/heads/flink-binding 072289a46 -> cd4e3cec3


MAHOUT-1747: Mahout DSL for Flink: add support for different types of indexes 
(String, long, etc)


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/cd4e3cec
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/cd4e3cec
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/cd4e3cec

Branch: refs/heads/flink-binding
Commit: cd4e3cec34aac50a6110839be47f9a1be5550e23
Parents: 072289a
Author: smarthi <[email protected]>
Authored: Tue Mar 15 14:24:04 2016 -0400
Committer: smarthi <[email protected]>
Committed: Tue Mar 15 14:24:04 2016 -0400

----------------------------------------------------------------------
 .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala   | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/cd4e3cec/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index f848c3f..6312f47 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -294,7 +294,12 @@ object FlinkEngine extends DistributedEngine {
   /** Parallelize in-core matrix as flink distributed matrix, using row labels 
as a data set keys. */
   override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
                                           (implicit dc: DistributedContext): 
CheckpointedDrm[String] = {
-    ???
+
+    val rb = m.getRowLabelBindings
+    val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), 
::)
+
+    new 
CheckpointedFlinkDrm[String](dc.env.fromCollection(p).setParallelism(numPartitions),
+      _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE)
   }
 
   /** This creates an empty DRM with specified number of partitions and 
cardinality. */

Reply via email to