MAHOUT-1817: optimize caching workaround for Flink, squashed commit of previously reverted commits
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b67398f9 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b67398f9 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b67398f9 Branch: refs/heads/master Commit: b67398f933d50d3e4f00ebd7ccd57f17b96604c7 Parents: f9111ac Author: Andrew Palumbo <[email protected]> Authored: Sun Mar 27 16:23:41 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sun Mar 27 16:23:41 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 18 ++--- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 42 ++++++++++- .../drm/CheckpointedFlinkDrm.scala | 73 ++++++++++++++++---- .../apache/mahout/flinkbindings/package.scala | 6 ++ 4 files changed, 112 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index c355cae..dd28e9d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -127,6 +127,7 @@ object FlinkEngine extends DistributedEngine { newcp.cache() } + private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = { implicit val kTag = oper.keyClassTag implicit val typeInformation = generateTypeInformation[K] @@ -137,13 +138,7 @@ object FlinkEngine extends DistributedEngine { FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)) case op@OpAt(a) if op.keyClassTag == ClassTag.Int â FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â - // express Atx as (A.t) %*% x - // TODO: create specific implementation of Atx, see MAHOUT-1749 - val opAt = OpAt(a) - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)) - val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) - val opAx = OpAx(atCast, x) - FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]] + FlinkOpAx.atx_with_broadcast(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] case op@OpAtB(a, b) â FlinkOpAtB.notZippable(op, flinkTranslate(a), flinkTranslate(b)).asInstanceOf[FlinkDrm[K]] case op@OpABt(a, b) â @@ -272,7 +267,7 @@ object FlinkEngine extends DistributedEngine { private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1) + val rows = (0 until m.nrow).map(i => (i, m(i, ::))) val dataSetType = TypeExtractor.getForObject(rows.head) //TODO: Make Sure that this is the correct partitioning scheme dc.env.fromCollection(rows) @@ -358,9 +353,9 @@ object FlinkEngine extends DistributedEngine { } def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - val tag = implicitly[ClassTag[K]] + implicit val ktag = classTag[K] - generateTypeInformationFromTag(tag) + generateTypeInformationFromTag(ktag) } private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { @@ -374,7 +369,4 @@ object FlinkEngine extends DistributedEngine { throw new IllegalArgumentException(s"index type $tag is not supported") } } - object FlinkEngine { - - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index ec20b6d..8a333c4 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -24,9 +24,12 @@ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.drm.logical.OpAx +import org.apache.mahout.flinkbindings.FlinkEngine +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, RowsFlinkDrm} +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical.{OpAtx, OpAx} import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.{Matrix, Vector} @@ -58,4 +61,39 @@ object FlinkOpAx { new BlockifiedFlinkDrm(out, op.nrow.toInt) } + + + def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = { + implicit val ctx = srcA.context + + val dataSetA = srcA.asBlockified.ds + + // broadcast the vector x to the back end + val bcastX = drmBroadcast(op.x) + + implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)] + val inCoreM = dataSetA.map { + tuple => + tuple._1.zipWithIndex.map { + case (key, idx) => tuple._2(idx, ::) * bcastX.value(key) + } + .reduce(_ += _) + } + // All-reduce + .reduce(_ += _) + + // collect result + .collect()(0) + + // Convert back to mtx + .toColMatrix + + // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug + // it back as a Flink drm + val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1) + + new RowsFlinkDrm[Int](res, 1) + + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index ea96e88..e59e5a5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -25,8 +25,10 @@ import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.core.fs.Path import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat +import org.apache.flink.configuration.GlobalConfiguration import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} +import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil import org.apache.mahout.flinkbindings.{DrmDataSet, _} import org.apache.mahout.math._ import org.apache.mahout.math.drm.CacheHint._ @@ -50,10 +52,26 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 // persistance values - var cacheFileName: String = "/a" + var cacheFileName: String = "undefinedCacheName" var isCached: Boolean = false var parallelismDeg: Int = -1 - val persistanceRootDir = "/tmp/" + var persistanceRootDir: String = _ + + // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} + val mahoutHome = getMahoutHome() + + // this is extra I/O for each cache call. this needs to be moved somewhere where it is called + // only once. Possibly FlinkDistributedEngine. + GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") + + val conf = GlobalConfiguration.getConfiguration() + + if (!(conf == null )) { + persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/") + } else { + persistanceRootDir = "/tmp/" + } + private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = System.nanoTime().toString + cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true + persist(ds, cacheFileName) } - implicit val typeInformation = createTypeInformation[(K,Vector)] + val _ds = readPersistedDataSet(cacheFileName, ds) + + /** Leave the parallelism degree to be set the operators + * TODO: find out a way to set the parallelism degree based on the + * final drm after computation is actually triggered + * + * // We may want to look more closely at this: + * // since we've cached a drm, triggering a computation + * // it may not make sense to keep the same parallelism degree + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * } + * + */ - val _ds = persist(ds, persistanceRootDir + cacheFileName) datasetWrap(_ds) } - def uncache() = { - // TODO + def uncache(): this.type = { + if (isCached) { + Hadoop2HDFSUtil.delete(cacheFileName) + isCached = false + } this } @@ -99,12 +135,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], * @param dataset [[DataSet]] to write to disk * @param path File path to write dataset to * @tparam T Type of the [[DataSet]] elements - * @return [[DataSet]] reading the just written file */ - def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { + def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = { val env = dataset.getExecutionEnvironment val outputFormat = new TypeSerializerOutputFormat[T] - val filePath = new Path(path) outputFormat.setOutputFilePath(filePath) @@ -112,14 +146,29 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], dataset.output(outputFormat) env.execute("FlinkTools persist") + } + + /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent + * operations. + * + * @param path File path to read dataset from + * @param ds persisted ds to retrieve type information and environment forom + * @tparam T key Type of the [[DataSet]] elements + * @return [[DataSet]] the persisted dataset + */ + def readPersistedDataSet[T: ClassTag : TypeInformation] + (path: String, ds: DataSet[T]): DataSet[T] = { - val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) + val env = ds.getExecutionEnvironment + val inputFormat = new TypeSerializerInputFormat[T](ds.getType()) + val filePath = new Path(path) inputFormat.setFilePath(filePath) env.createInput(inputFormat) } - // Members declared in org.apache.mahout.math.drm.DrmLike + + // Members declared in org.apache.mahout.math.drm.DrmLike protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 10ce545..e769952 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -105,4 +105,10 @@ package object flinkbindings { private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag + private[flinkbindings] def getMahoutHome() = { + var mhome = System.getenv("MAHOUT_HOME") + if (mhome == null) mhome = System.getProperty("mahout.home") + require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs") + mhome + } } \ No newline at end of file
