MAHOUT-1810: Use method taken from FlinkMLTools for CheckpointedFlinkDrm cache persistance closes apache/mahout#201
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/202b94f8 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/202b94f8 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/202b94f8 Branch: refs/heads/master Commit: 202b94f840286d4d0970f0427122697ba27fc1fb Parents: e06fb11 Author: Andrew Palumbo <[email protected]> Authored: Tue Mar 22 19:14:57 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Tue Mar 22 19:14:57 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 7 ++- .../drm/CheckpointedFlinkDrm.scala | 51 +++++++++++++++++--- .../apache/mahout/flinkbindings/package.scala | 1 + 3 files changed, 51 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index e606514..843a4a9 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -357,12 +357,12 @@ object FlinkEngine extends DistributedEngine { res.collect().head } - private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { + def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { val tag = implicitly[ClassTag[K]] generateTypeInformationFromTag(tag) } - + private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { if (tag.runtimeClass.equals(classOf[Int])) { createTypeInformation[Int].asInstanceOf[TypeInformation[K]] @@ -376,4 +376,7 @@ object FlinkEngine extends DistributedEngine { throw new IllegalArgumentException(s"index type $tag is not supported") } } + object FlinkEngine { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index a6b267b..ea96e88 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -19,7 +19,11 @@ package org.apache.mahout.flinkbindings.drm import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} import org.apache.flink.api.scala._ +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.core.fs.Path import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} @@ -34,7 +38,7 @@ import scala.collection.JavaConverters._ import scala.reflect.{ClassTag, classTag} import scala.util.Random -class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], +class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN, override val cacheHint: CacheHint = CacheHint.NONE, @@ -45,7 +49,11 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1 lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 - var cacheFileName:String = "/tmp/a" + // persistance values + var cacheFileName: String = "/a" + var isCached: Boolean = false + var parallelismDeg: Int = -1 + val persistanceRootDir = "/tmp/" private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -69,10 +77,15 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { - cacheFileName = System.nanoTime().toString - implicit val context = new FlinkDistributedContext(ds.getExecutionEnvironment) - dfsWrite("/tmp/" + cacheFileName) - drmDfsRead("/tmp/" + cacheFileName).asInstanceOf[CheckpointedDrm[K]] + if (!isCached) { + cacheFileName = System.nanoTime().toString + parallelismDeg = ds.getParallelism + isCached = true + } + implicit val typeInformation = createTypeInformation[(K,Vector)] + + val _ds = persist(ds, persistanceRootDir + cacheFileName) + datasetWrap(_ds) } def uncache() = { @@ -80,6 +93,32 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], this } + /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent + * operations. + * + * @param dataset [[DataSet]] to write to disk + * @param path File path to write dataset to + * @tparam T Type of the [[DataSet]] elements + * @return [[DataSet]] reading the just written file + */ + def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = { + val env = dataset.getExecutionEnvironment + val outputFormat = new TypeSerializerOutputFormat[T] + + val filePath = new Path(path) + + outputFormat.setOutputFilePath(filePath) + outputFormat.setWriteMode(WriteMode.OVERWRITE) + + dataset.output(outputFormat) + env.execute("FlinkTools persist") + + val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) + inputFormat.setFilePath(filePath) + + env.createInput(inputFormat) + } + // Members declared in org.apache.mahout.math.drm.DrmLike protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows http://git-wip-us.apache.org/repos/asf/mahout/blob/202b94f8/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index b083752..f0dd620 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -99,6 +99,7 @@ package object flinkbindings { } def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = { + implicit val typeInformation = FlinkEngine.generateTypeInformation[K] new CheckpointedFlinkDrm[K](dataset) }
