revert commits: 48a8a8208322ed690d5356a4e0cac7667b080bab e420485e0b82908919985a892f85c7d6dff9d24b f0ee522c69a1f47f0d4da263e9f1669404155b5e b45f982581b2a2bed75b771672b4f2d66ef32840 b96918bba9855fba5cefc11e1c4153b9419509cb 100d343e4b6e66b1a7c581455cd1faab7bbdb538 ad22252ca2ab39e22d4df7cc22464af2c6179830 ad4c32ce871df686267df1f1dbff76a883b8d3fc 9c5ee59214a454f7ae25c762bf04bb30bd7982c8 a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6
commit 48a8a8208322ed690d5356a4e0cac7667b080bab Author: Andrew Palumbo <[email protected]> Date: Sun Mar 27 15:16:36 2016 -0400 comment out parallization settting in cache() commit e420485e0b82908919985a892f85c7d6dff9d24b Author: Andrew Palumbo <[email protected]> Date: Sat Mar 26 23:49:14 2016 -0400 Comments, cleanup commit f0ee522c69a1f47f0d4da263e9f1669404155b5e Merge: b45f982 a77f1c1 Author: Andrew Palumbo <[email protected]> Date: Sat Mar 26 23:28:52 2016 -0400 Merge branch 'flink-binding' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1817 commit a77f1c13de58d462eed7ff224ec333b22ac22bf3 Author: Andrew Palumbo <[email protected]> Date: Sat Mar 26 23:22:44 2016 -0400 MAHOUT-1749 Mahout DSL for Flink: Implement Atx closes apache/mahout#204 commit b45f982581b2a2bed75b771672b4f2d66ef32840 Author: Andrew Palumbo <[email protected]> Date: Sat Mar 26 17:35:37 2016 -0400 move getMahoutHome() commit b96918bba9855fba5cefc11e1c4153b9419509cb Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 20:33:22 2016 -0400 wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml commit 100d343e4b6e66b1a7c581455cd1faab7bbdb538 Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 20:26:41 2016 -0400 wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml commit ad22252ca2ab39e22d4df7cc22464af2c6179830 Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 19:05:39 2016 -0400 use as the base directory for cached files commit ad4c32ce871df686267df1f1dbff76a883b8d3fc Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 18:31:51 2016 -0400 add unchace commit 9c5ee59214a454f7ae25c762bf04bb30bd7982c8 Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 18:10:18 2016 -0400 Persist only if the dataset has not been cached. Otherwise read back in already cached dataset commit a1cf7cf56e036ce12d616f8aea0af1e9dcdf2cb6 Author: Andrew Palumbo <[email protected]> Date: Fri Mar 25 16:56:20 2016 -0400 Small change addressing DL's comment on apache/mahout#200, also a small fix Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f9111ac2 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f9111ac2 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f9111ac2 Branch: refs/heads/master Commit: f9111ac27334659b2fb55849319dcd8e113b3c25 Parents: 48a8a82 Author: Andrew Palumbo <[email protected]> Authored: Sun Mar 27 16:06:11 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sun Mar 27 16:06:11 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 18 +++-- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 42 +---------- .../drm/CheckpointedFlinkDrm.scala | 73 ++++---------------- .../apache/mahout/flinkbindings/package.scala | 6 -- 4 files changed, 27 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index dd28e9d..c355cae 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -127,7 +127,6 @@ object FlinkEngine extends DistributedEngine { newcp.cache() } - private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = { implicit val kTag = oper.keyClassTag implicit val typeInformation = generateTypeInformation[K] @@ -138,7 +137,13 @@ object FlinkEngine extends DistributedEngine { FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)) case op@OpAt(a) if op.keyClassTag == ClassTag.Int â FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â - FlinkOpAx.atx_with_broadcast(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] + // express Atx as (A.t) %*% x + // TODO: create specific implementation of Atx, see MAHOUT-1749 + val opAt = OpAt(a) + val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)) + val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) + val opAx = OpAx(atCast, x) + FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]] case op@OpAtB(a, b) â FlinkOpAtB.notZippable(op, flinkTranslate(a), flinkTranslate(b)).asInstanceOf[FlinkDrm[K]] case op@OpABt(a, b) â @@ -267,7 +272,7 @@ object FlinkEngine extends DistributedEngine { private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::))) + val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1) val dataSetType = TypeExtractor.getForObject(rows.head) //TODO: Make Sure that this is the correct partitioning scheme dc.env.fromCollection(rows) @@ -353,9 +358,9 @@ object FlinkEngine extends DistributedEngine { } def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - implicit val ktag = classTag[K] + val tag = implicitly[ClassTag[K]] - generateTypeInformationFromTag(ktag) + generateTypeInformationFromTag(tag) } private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { @@ -369,4 +374,7 @@ object FlinkEngine extends DistributedEngine { throw new IllegalArgumentException(s"index type $tag is not supported") } } + object FlinkEngine { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index 8a333c4..ec20b6d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -24,12 +24,9 @@ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.FlinkEngine -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical.{OpAtx, OpAx} +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math.drm.logical.OpAx import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.{Matrix, Vector} @@ -61,39 +58,4 @@ object FlinkOpAx { new BlockifiedFlinkDrm(out, op.nrow.toInt) } - - - def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = { - implicit val ctx = srcA.context - - val dataSetA = srcA.asBlockified.ds - - // broadcast the vector x to the back end - val bcastX = drmBroadcast(op.x) - - implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)] - val inCoreM = dataSetA.map { - tuple => - tuple._1.zipWithIndex.map { - case (key, idx) => tuple._2(idx, ::) * bcastX.value(key) - } - .reduce(_ += _) - } - // All-reduce - .reduce(_ += _) - - // collect result - .collect()(0) - - // Convert back to mtx - .toColMatrix - - // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug - // it back as a Flink drm - val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1) - - new RowsFlinkDrm[Int](res, 1) - - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index e59e5a5..ea96e88 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -25,10 +25,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.core.fs.Path import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat -import org.apache.flink.configuration.GlobalConfiguration import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} -import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil import org.apache.mahout.flinkbindings.{DrmDataSet, _} import org.apache.mahout.math._ import org.apache.mahout.math.drm.CacheHint._ @@ -52,26 +50,10 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 // persistance values - var cacheFileName: String = "undefinedCacheName" + var cacheFileName: String = "/a" var isCached: Boolean = false var parallelismDeg: Int = -1 - var persistanceRootDir: String = _ - - // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} - val mahoutHome = getMahoutHome() - - // this is extra I/O for each cache call. this needs to be moved somewhere where it is called - // only once. Possibly FlinkDistributedEngine. - GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") - - val conf = GlobalConfiguration.getConfiguration() - - if (!(conf == null )) { - persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/") - } else { - persistanceRootDir = "/tmp/" - } - + val persistanceRootDir = "/tmp/" private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -94,38 +76,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] - /** Note as of Flink 1.0.0, no direct flink caching exists so we save - * the dataset to the filesystem and read it back when cache is called */ def cache() = { if (!isCached) { - cacheFileName = persistanceRootDir + System.nanoTime().toString + cacheFileName = System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true - persist(ds, cacheFileName) } - val _ds = readPersistedDataSet(cacheFileName, ds) - - /** Leave the parallelism degree to be set the operators - * TODO: find out a way to set the parallelism degree based on the - * final drm after computation is actually triggered - * - * // We may want to look more closely at this: - * // since we've cached a drm, triggering a computation - * // it may not make sense to keep the same parallelism degree - * if (!(parallelismDeg == _ds.getParallelism)) { - * _ds.setParallelism(parallelismDeg).rebalance() - * } - * - */ + implicit val typeInformation = createTypeInformation[(K,Vector)] + val _ds = persist(ds, persistanceRootDir + cacheFileName) datasetWrap(_ds) } - def uncache(): this.type = { - if (isCached) { - Hadoop2HDFSUtil.delete(cacheFileName) - isCached = false - } + def uncache() = { + // TODO this } @@ -135,10 +99,12 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], * @param dataset [[DataSet]] to write to disk * @param path File path to write dataset to * @tparam T Type of the [[DataSet]] elements + * @return [[DataSet]] reading the just written file */ - def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = { + def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { val env = dataset.getExecutionEnvironment val outputFormat = new TypeSerializerOutputFormat[T] + val filePath = new Path(path) outputFormat.setOutputFilePath(filePath) @@ -146,29 +112,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], dataset.output(outputFormat) env.execute("FlinkTools persist") - } - - /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent - * operations. - * - * @param path File path to read dataset from - * @param ds persisted ds to retrieve type information and environment forom - * @tparam T key Type of the [[DataSet]] elements - * @return [[DataSet]] the persisted dataset - */ - def readPersistedDataSet[T: ClassTag : TypeInformation] - (path: String, ds: DataSet[T]): DataSet[T] = { - val env = ds.getExecutionEnvironment - val inputFormat = new TypeSerializerInputFormat[T](ds.getType()) - val filePath = new Path(path) + val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) inputFormat.setFilePath(filePath) env.createInput(inputFormat) } - - // Members declared in org.apache.mahout.math.drm.DrmLike + // Members declared in org.apache.mahout.math.drm.DrmLike protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows http://git-wip-us.apache.org/repos/asf/mahout/blob/f9111ac2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index e769952..10ce545 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -105,10 +105,4 @@ package object flinkbindings { private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag - private[flinkbindings] def getMahoutHome() = { - var mhome = System.getenv("MAHOUT_HOME") - if (mhome == null) mhome = System.getProperty("mahout.home") - require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs") - mhome - } } \ No newline at end of file
