Repository: mahout Updated Branches: refs/heads/flink-binding d1d6fc0a4 -> f4f42ae4c
MAHOUT-1810: Failing test in flink-bindings: A + B Identically partitioned (mapBlock Checkpointing issue) closes apache/mahout#198 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f4f42ae4 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f4f42ae4 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f4f42ae4 Branch: refs/heads/flink-binding Commit: f4f42ae4c4c7555659edcc43669fec82f9537219 Parents: d1d6fc0 Author: Andrew Palumbo <[email protected]> Authored: Mon Mar 21 22:22:09 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Mon Mar 21 22:22:09 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 34 +++++++++----------- .../flinkbindings/blas/FlinkOpMapBlock.scala | 2 +- .../drm/CheckpointedFlinkDrm.scala | 30 ++++------------- 3 files changed, 23 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index f1e06d0..e606514 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -72,30 +72,30 @@ object FlinkEngine extends DistributedEngine { if (metadata.keyClassTag == ClassTag.Int) { val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path) - val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] { - def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = { - (unwrapKey(tuple._1), tuple._2.get()) + val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Int, Vector)] { + def map(tuple: (IntWritable, VectorWritable)): (Int, Vector) = { + (unwrapKey(tuple._1).asInstanceOf[Int], tuple._2.get()) } }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Int]]) } else if (metadata.keyClassTag == ClassTag.Long) { val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path) - val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] { - def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = { - (unwrapKey(tuple._1), tuple._2.get()) + val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Long, Vector)] { + def map(tuple: (LongWritable, VectorWritable)): (Long, Vector) = { + (unwrapKey(tuple._1).asInstanceOf[Long], tuple._2.get()) } }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Long]]) } else if (metadata.keyClassTag == ClassTag(classOf[String])) { val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path) - val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] { - def map(tuple: (Text, VectorWritable)): (Any, Vector) = { - (unwrapKey(tuple._1), tuple._2.get()) + val res = ds.map(new MapFunction[(Text, VectorWritable), (String, Vector)] { + def map(tuple: (Text, VectorWritable)): (String, Vector) = { + (unwrapKey(tuple._1).asInstanceOf[String], tuple._2.get()) } }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[String]]) } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}") } @@ -124,7 +124,6 @@ object FlinkEngine extends DistributedEngine { implicit val typeInformation = generateTypeInformation[K] val drm = flinkTranslate(plan) val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol) - // newcp.ds.getExecutionEnvironment.createProgramPlan("plan") newcp.cache() } @@ -135,7 +134,6 @@ object FlinkEngine extends DistributedEngine { case OpAtAnyKey(_) â throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") case op@OpAx(a, x) â - //implicit val typeInformation = generateTypeInformation[K] FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)) case op@OpAt(a) if op.keyClassTag == ClassTag.Int â FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â @@ -180,11 +178,9 @@ object FlinkEngine extends DistributedEngine { FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag â throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them") - case op: OpMapBlock[K, _] â - FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]] - case cp: CheckpointedFlinkDrm[K] â - //implicit val ktag=cp.keyClassTag - new RowsFlinkDrm[K](cp.ds, cp.ncol) + case op: OpMapBlock[_, K] â + FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op) + case cp: CheckpointedDrm[K] â cp case _ â throw new NotImplementedError(s"operator $oper is not implemented yet") } http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala index c3918a5..ec4769a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala @@ -31,6 +31,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ object FlinkOpMapBlock { def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = { + implicit val rtag = operator.keyClassTag val bmf = operator.bmf val ncol = operator.ncol @@ -39,7 +40,6 @@ object FlinkOpMapBlock { val result = bmf(block) assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.") assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.") - // printf("Block partition: \n%s\n", block._2) result } http://git-wip-us.apache.org/repos/asf/mahout/blob/f4f42ae4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 84b327a..6f1ba9f 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -45,6 +45,8 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1 lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 + var cacheFileName:String = "/tmp/a" + private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -67,8 +69,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { - // TODO - this + cacheFileName = System.nanoTime().toString + implicit val context = new FlinkDistributedContext(ds.getExecutionEnvironment) + dfsWrite("/tmp/" + cacheFileName) + drmDfsRead("/tmp/" + cacheFileName).asInstanceOf[CheckpointedDrm[K]] } def uncache() = { @@ -81,8 +85,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = { - - this + this } def collect: Matrix = { @@ -127,25 +130,6 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def dfsWrite(path: String): Unit = { val env = ds.getExecutionEnvironment - // ds.map is not picking up the correct runtime value of tuple._1 - // WritableType info is throwing an exception - // when asserting that the key is not an actual Writable - // rather a subclass - -// val keyTag = implicitly[ClassTag[K]] -// def convertKey = keyToWritableFunc(keyTag) -// val writableDataset = ds.map { -// tuple => (convertKey(tuple._1), new VectorWritable(tuple._2)) -// } - - - // test output with IntWritable Key. VectorWritable is not a problem, -// val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] { -// def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) = -// (new IntWritable(1), new VectorWritable(tuple._2)) -// }) - - val keyTag = implicitly[ClassTag[K]] val job = new JobConf
