Repository: mahout Updated Branches: refs/heads/flink-binding 072289a46 -> cd4e3cec3
MAHOUT-1747: Mahout DSL for Flink: add support for different types of indexes (String, long, etc) Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/cd4e3cec Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/cd4e3cec Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/cd4e3cec Branch: refs/heads/flink-binding Commit: cd4e3cec34aac50a6110839be47f9a1be5550e23 Parents: 072289a Author: smarthi <[email protected]> Authored: Tue Mar 15 14:24:04 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Mar 15 14:24:04 2016 -0400 ---------------------------------------------------------------------- .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/cd4e3cec/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index f848c3f..6312f47 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -294,7 +294,12 @@ object FlinkEngine extends DistributedEngine { /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */ override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[String] = { - ??? + + val rb = m.getRowLabelBindings + val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) + + new CheckpointedFlinkDrm[String](dc.env.fromCollection(p).setParallelism(numPartitions), + _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE) } /** This creates an empty DRM with specified number of partitions and cardinality. */
