Repository: mahout Updated Branches: refs/heads/flink-binding 9e9ec79d6 -> 1d9b6322e
1. Reworked FlinkEngine.drmDfsRead 2. small renaming of methods in FlinkDrm.scala Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/41dcb425 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/41dcb425 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/41dcb425 Branch: refs/heads/flink-binding Commit: 41dcb4253af7fdccee81b2f2acf22e6a904c2b9d Parents: 9e9ec79 Author: smarthi <[email protected]> Authored: Sat Oct 24 23:37:37 2015 -0400 Committer: smarthi <[email protected]> Committed: Sat Oct 24 23:38:15 2015 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 28 +++++++++++--------- .../mahout/flinkbindings/blas/FlinkOpAewB.scala | 4 +-- .../flinkbindings/blas/FlinkOpAewScalar.scala | 6 ++--- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 2 +- .../mahout/flinkbindings/blas/FlinkOpAtA.scala | 4 +-- .../mahout/flinkbindings/blas/FlinkOpAtB.scala | 4 +-- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 2 +- .../flinkbindings/blas/FlinkOpCBind.scala | 8 +++--- .../flinkbindings/blas/FlinkOpMapBlock.scala | 2 +- .../flinkbindings/blas/FlinkOpRBind.scala | 2 +- .../flinkbindings/blas/FlinkOpRowRange.scala | 2 +- .../blas/FlinkOpTimesRightMatrix.scala | 2 +- .../mahout/flinkbindings/drm/FlinkDrm.scala | 12 ++++----- .../mahout/flinkbindings/blas/LATestSuite.scala | 22 +++++++-------- .../mahout/sparkbindings/SparkEngine.scala | 3 --- 15 files changed, 52 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index fee3d73..269a928 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -19,6 +19,7 @@ package org.apache.mahout.flinkbindings import java.util.Collection + import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -30,7 +31,6 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.blas._ import org.apache.mahout.flinkbindings.drm._ import org.apache.mahout.flinkbindings.io.HDFSUtil @@ -58,16 +58,20 @@ object FlinkEngine extends DistributedEngine { */ override def drmDfsRead(path: String, parMin: Int = 0) (implicit dc: DistributedContext): CheckpointedDrm[_] = { + + // Require that context is actually Flink context. + require(dc.isInstanceOf[FlinkDistributedContext], "Supplied context must be for the Flink backend.") + + // Extract the Flink Environment variable + implicit val env = dc.asInstanceOf[FlinkDistributedContext].env + val metadata = hdfsUtils.readDrmHeader(path) val unwrapKey = metadata.unwrapKeyFunction - val job = new JobConf - val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable] - FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path)) - - val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job) + val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable], + classOf[Writable], classOf[VectorWritable], path) - val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] { + val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] { def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = { (unwrapKey(tuple.f0), tuple.f1) } @@ -89,7 +93,7 @@ object FlinkEngine extends DistributedEngine { override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { // Flink-specific Physical Plan translation. val drm = flinkTranslate(plan) - val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol) + val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol) newcp.cache() } @@ -101,7 +105,7 @@ object FlinkEngine extends DistributedEngine { // TODO: create specific implementation of Atx, see MAHOUT-1749 val opAt = OpAt(a) val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA)) - val atCast = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) + val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) val opAx = OpAx(atCast, x) FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA)) } @@ -112,11 +116,11 @@ object FlinkEngine extends DistributedEngine { // TODO: create specific implementation of ABt, see MAHOUT-1750 val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) - val c = new CheckpointedFlinkDrm(at.deblockify.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) + val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) - val d = new CheckpointedFlinkDrm(bt.deblockify.ds, _nrow=opBt.nrow, _ncol=opBt.ncol) + val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol) FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)) .asInstanceOf[FlinkDrm[K]] @@ -167,7 +171,7 @@ object FlinkEngine extends DistributedEngine { override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol - val result = drm.blockify.ds.map(new MapFunction[(Array[K], Matrix), Vector] { + val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] { def map(tuple: (Array[K], Matrix)): Vector = { val (_, block) = tuple val acc = block(0, ::).like() http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala index 460199e..38fe312 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala @@ -29,8 +29,8 @@ object FlinkOpAewB { val classTag = extractRealClassTag(op.A) val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) - val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]] + val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] + val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] val res: DataSet[(Any, Vector)] = rowsA.coGroup(rowsB).where(joiner).equalTo(joiner) http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala index a97f8c8..ab434bb 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala @@ -44,7 +44,7 @@ object FlinkOpAewScalar { def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = { val function = EWOpsCloning.strToFunction(op.op) - val res = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { case (keys, mat) => (keys, function(mat, scalar)) } @@ -58,7 +58,7 @@ object FlinkOpAewScalar { val inplace = isInplace val res = if (op.evalZeros) { - A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { val (keys, block) = tuple val newBlock = if (inplace) block else block.cloned @@ -67,7 +67,7 @@ object FlinkOpAewScalar { } }) } else { - A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { val (keys, block) = tuple val newBlock = if (inplace) block else block.cloned http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index b859e1f..274b1ca 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -51,7 +51,7 @@ object FlinkOpAt { def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = { val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T - val sparseParts = A.blockify.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] { + val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] { def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match { case (keys, block) => { (0 until block.ncol).map(columnIdx => { http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala index 0bda805..0e30eff 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala @@ -45,7 +45,7 @@ object FlinkOpAtA { } def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = { - val ds = A.blockify.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]] + val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]] val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] { // TODO: optimize it: use upper-triangle matrices like in Spark @@ -62,7 +62,7 @@ object FlinkOpAtA { def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = { val nrow = op.A.nrow val ncol = op.A.ncol - val ds = A.blockify.ds + val ds = A.asBlockified.ds val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] { def map(a: (Array[Any], Matrix)): Int = 1 http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index 362c62f..0dd0dd2 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -52,8 +52,8 @@ object FlinkOpAtB { val classTag = extractRealClassTag(op.A) val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) - val rowsAt = At.deblockify.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]] + val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] + val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner) val ncol = op.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index b473a4c..503ab17 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -44,7 +44,7 @@ object FlinkOpAx { val singletonDataSetX = ctx.env.fromElements(op.x) - val out = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + val out = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { var x: Vector = null override def open(params: Configuration): Unit = { http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala index 234937b..49ca7d5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala @@ -50,8 +50,8 @@ object FlinkOpCBind { val classTag = extractRealClassTag(op.A) val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) - val rowsA = A.deblockify.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.deblockify.ds.asInstanceOf[DrmDataSet[Any]] + val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] + val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] val res: DataSet[(Any, Vector)] = rowsA.coGroup(rowsB).where(joiner).equalTo(joiner) @@ -102,9 +102,9 @@ object FlinkOpCBind { def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = { val left = op.leftBind - val ds = A.blockify.ds + val ds = A.asBlockified.ds - val out = A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { case (keys, mat) => (keys, cbind(mat, x, left)) } http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala index cd745e4..9530d43 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala @@ -34,7 +34,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ object FlinkOpMapBlock { def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = { - val res = src.blockify.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] { + val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] { def map(block: (Array[S], Matrix)): (Array[R], Matrix) = { val out = function(block) assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.") http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala index f8fbea0..9ebff51 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala @@ -30,7 +30,7 @@ object FlinkOpRBind { def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { // note that indexes of B are already re-arranged prior to executing this code - val res = A.deblockify.ds.union(B.deblockify.ds) + val res = A.asRowWise.ds.union(B.asRowWise.ds) new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala index f720125..6e11892 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala @@ -35,7 +35,7 @@ object FlinkOpRowRange { val rowRange = op.rowRange val firstIdx = rowRange.head - val filtered = A.deblockify.ds.filter(new FilterFunction[(Int, Vector)] { + val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] { def filter(tuple: (Int, Vector)): Boolean = tuple match { case (idx, vec) => rowRange.contains(idx) } http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala index 92724d8..af3854d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala @@ -39,7 +39,7 @@ object FlinkOpTimesRightMatrix { val singletonDataSetB = ctx.env.fromElements(inCoreB) - val res = A.blockify.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { var inCoreB: Matrix = null override def open(params: Configuration): Unit = { http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index 27eac4e..d00a335 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -43,8 +43,8 @@ trait FlinkDrm[K] { def context: FlinkDistributedContext def isBlockified: Boolean - def blockify: BlockifiedFlinkDrm[K] - def deblockify: RowsFlinkDrm[K] + def asBlockified: BlockifiedFlinkDrm[K] + def asRowWise: RowsFlinkDrm[K] def classTag: ClassTag[K] } @@ -56,7 +56,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl def isBlockified = false - def blockify(): BlockifiedFlinkDrm[K] = { + def asBlockified(): BlockifiedFlinkDrm[K] = { val ncolLocal = ncol val classTag = implicitly[ClassTag[K]] @@ -81,7 +81,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl new BlockifiedFlinkDrm(parts, ncol) } - def deblockify = this + def asRowWise = this def classTag = implicitly[ClassTag[K]] @@ -94,9 +94,9 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: def isBlockified = true - def blockify = this + def asBlockified = this - def deblockify = { + def asRowWise = { val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] { def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match { case (keys, block) => keys.view.zipWithIndex.foreach { http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala index 786ab5f..a766146 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala @@ -41,7 +41,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val opAx = new OpAx(A, x) val res = FlinkOpAx.blockifiedBroadcastAx(opAx, A) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds) val output = drm.collect val b = output(::, 0) @@ -54,7 +54,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val opAt = new OpAt(A) val res = FlinkOpAt.sparseTrick(opAt, A) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow) val output = drm.collect assert((output - inCoreA.t).norm < 1e-6) @@ -71,7 +71,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val opAtB = new OpAtB(At, B) val res = FlinkOpAtB.notZippable(opAtB, At, B) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol) val output = drm.collect val expected = inCoreAt.t %*% inCoreB @@ -86,7 +86,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpAewScalar(A, scalar, "*") val res = FlinkOpAewScalar.opScalarNoSideEffect(op, A, scalar) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) val output = drm.collect val expected = inCoreA * scalar @@ -100,7 +100,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpAewB(A, A, "*") val res = FlinkOpAewB.rowWiseJoinNoSideEffect(op, A, A) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) val output = drm.collect assert((output - (inCoreA * inCoreA)).norm < 1e-6) @@ -115,7 +115,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpCbind(A, B) val res = FlinkOpCBind.cbind(op, A, B) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=(inCoreA.ncol + inCoreB.ncol)) val output = drm.collect @@ -130,7 +130,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpCbindScalar(A, 1, true) val res = FlinkOpCBind.cbindScalar(op, A, 1) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=(inCoreA.ncol + 1)) val output = drm.collect @@ -145,7 +145,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpCbindScalar(A, 1, false) val res = FlinkOpCBind.cbindScalar(op, A, 1) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=inCoreA.nrow, + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=(inCoreA.ncol + 1)) val output = drm.collect @@ -161,7 +161,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpRowRange(A, range) val res = FlinkOpRowRange.slice(op, A) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=inCoreA.ncol) val output = drm.collect @@ -177,7 +177,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpTimesRightMatrix(A, inCoreB) val res = FlinkOpTimesRightMatrix.drmTimesInCore(op, A, inCoreB) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=op.ncol) val output = drm.collect @@ -204,7 +204,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val op = new OpAtA(Aany) val res = FlinkOpAtA.fat(op, Aany) - val drm = new CheckpointedFlinkDrm(res.deblockify.ds, _nrow=op.nrow, _ncol=op.ncol) + val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=op.ncol) val output = drm.collect println(output) http://git-wip-us.apache.org/repos/asf/mahout/blob/41dcb425/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index fdbd950..d89a8de 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -26,7 +26,6 @@ import RLikeOps._ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput} import org.apache.mahout.math._ -import scala.Predef import scala.reflect.ClassTag import scala.reflect.classTag import org.apache.spark.storage.StorageLevel @@ -35,8 +34,6 @@ import org.apache.hadoop.io._ import collection._ import JavaConversions._ import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.spark.rdd.RDD import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
