Comments, cleanup
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e420485e Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e420485e Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e420485e Branch: refs/heads/master Commit: e420485e0b82908919985a892f85c7d6dff9d24b Parents: f0ee522 Author: Andrew Palumbo <[email protected]> Authored: Sat Mar 26 23:49:14 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sat Mar 26 23:49:14 2016 -0400 ---------------------------------------------------------------------- .../org/apache/mahout/flinkbindings/FlinkEngine.scala | 2 +- .../flinkbindings/drm/CheckpointedFlinkDrm.scala | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 9adec7e..dd28e9d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -267,7 +267,7 @@ object FlinkEngine extends DistributedEngine { private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1) + val rows = (0 until m.nrow).map(i => (i, m(i, ::))) val dataSetType = TypeExtractor.getForObject(rows.head) //TODO: Make Sure that this is the correct partitioning scheme dc.env.fromCollection(rows) http://git-wip-us.apache.org/repos/asf/mahout/blob/e420485e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 9b3a9f5..8424856 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -18,7 +18,7 @@ */ package org.apache.mahout.flinkbindings.drm -import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction} +import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} import org.apache.flink.api.scala._ @@ -60,6 +60,8 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} val mahoutHome = getMahoutHome() + // this is extra I/O for each cache call. this needs to be moved somewhere where it is called + // only once. Possibly FlinkDistributedEngine. GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") val conf = GlobalConfiguration.getConfiguration() @@ -92,16 +94,20 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] + /** Note as of Flink 1.0.0, no direct flink caching exists so we save + * the dataset to the filesystem and read it back when cache is called */ def cache() = { -// implicit val typeInformation = createTypeInformation[(K,Vector)] if (!isCached) { cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism isCached = true persist(ds, cacheFileName) } - val _ds = readPersistedDataSet(cacheFileName, ds) + + // We may want to look more closely at this: + // since we've cached a drm, triggering a computation + // it may not make sense to keep the same parallelism degree if (!(parallelismDeg == _ds.getParallelism)) { _ds.setParallelism(parallelismDeg).rebalance() } @@ -141,7 +147,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], * @param path File path to read dataset from * @param ds persisted ds to retrieve type information and environment forom * @tparam T key Type of the [[DataSet]] elements - * @return [[DataSet]] reading the just written file + * @return [[DataSet]] the persisted dataset */ def readPersistedDataSet[T: ClassTag : TypeInformation] (path: String, ds: DataSet[T]): DataSet[T] = {
