MAHOUT-1817: optimize caching workaround for Flink,  squashed commit of 
previously reverted commits


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b67398f9
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b67398f9
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b67398f9

Branch: refs/heads/master
Commit: b67398f933d50d3e4f00ebd7ccd57f17b96604c7
Parents: f9111ac
Author: Andrew Palumbo <[email protected]>
Authored: Sun Mar 27 16:23:41 2016 -0400
Committer: Andrew Palumbo <[email protected]>
Committed: Sun Mar 27 16:23:41 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 18 ++---
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   | 42 ++++++++++-
 .../drm/CheckpointedFlinkDrm.scala              | 73 ++++++++++++++++----
 .../apache/mahout/flinkbindings/package.scala   |  6 ++
 4 files changed, 112 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index c355cae..dd28e9d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -127,6 +127,7 @@ object FlinkEngine extends DistributedEngine {
     newcp.cache()
   }
 
+
   private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
     implicit val kTag = oper.keyClassTag
     implicit val typeInformation = generateTypeInformation[K]
@@ -137,13 +138,7 @@ object FlinkEngine extends DistributedEngine {
         FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
       case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ 
FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
-        // express Atx as (A.t) %*% x
-        // TODO: create specific implementation of Atx, see MAHOUT-1749
-        val opAt = OpAt(a)
-        val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a))
-        val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = 
opAt.nrow, _ncol = opAt.ncol)
-        val opAx = OpAx(atCast, x)
-        FlinkOpAx.blockifiedBroadcastAx(opAx, 
flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]]
+        FlinkOpAx.atx_with_broadcast(op, 
flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
       case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
         flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
       case op@OpABt(a, b) ⇒
@@ -272,7 +267,7 @@ object FlinkEngine extends DistributedEngine {
 
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
                                         (implicit dc: DistributedContext): 
DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, 
jj) => ii._1 < jj._1)
+    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
     val dataSetType = TypeExtractor.getForObject(rows.head)
     //TODO: Make Sure that this is the correct partitioning scheme
     dc.env.fromCollection(rows)
@@ -358,9 +353,9 @@ object FlinkEngine extends DistributedEngine {
   }
 
   def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    val tag = implicitly[ClassTag[K]]
+    implicit val ktag = classTag[K]
 
-    generateTypeInformationFromTag(tag)
+    generateTypeInformationFromTag(ktag)
   }
 
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): 
TypeInformation[K] = {
@@ -374,7 +369,4 @@ object FlinkEngine extends DistributedEngine {
       throw new IllegalArgumentException(s"index type $tag is not supported")
     }
   }
-  object FlinkEngine {
-
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index ec20b6d..8a333c4 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -24,9 +24,12 @@ import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.drm.logical.OpAx
+import org.apache.mahout.flinkbindings.FlinkEngine
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, 
RowsFlinkDrm}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.{OpAtx, OpAx}
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.{Matrix, Vector}
 
 
@@ -58,4 +61,39 @@ object FlinkOpAx {
 
     new BlockifiedFlinkDrm(out, op.nrow.toInt)
   }
+
+
+  def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = {
+    implicit val ctx = srcA.context
+
+    val dataSetA = srcA.asBlockified.ds
+
+    // broadcast the vector x to the back end
+    val bcastX = drmBroadcast(op.x)
+
+    implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)]
+    val inCoreM = dataSetA.map {
+      tuple =>
+        tuple._1.zipWithIndex.map {
+          case (key, idx) => tuple._2(idx, ::) * bcastX.value(key)
+        }
+          .reduce(_ += _)
+    }
+      // All-reduce
+      .reduce(_ += _)
+
+      // collect result
+      .collect()(0)
+
+      // Convert back to mtx
+      .toColMatrix
+
+    // It is ridiculous, but in this scheme we will have to re-parallelize it 
again in order to plug
+    // it back as a Flink drm
+    val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
+
+    new RowsFlinkDrm[Int](res, 1)
+
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index ea96e88..e59e5a5 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,8 +25,10 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, 
SequenceFileOutputFormat}
+import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
 import org.apache.mahout.flinkbindings.{DrmDataSet, _}
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm.CacheHint._
@@ -50,10 +52,26 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val 
ds: DrmDataSet[K],
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
   // persistance values
-  var cacheFileName: String = "/a"
+  var cacheFileName: String = "undefinedCacheName"
   var isCached: Boolean = false
   var parallelismDeg: Int = -1
-  val persistanceRootDir = "/tmp/"
+  var persistanceRootDir: String = _
+
+  // need to make sure that this is actually getting the correct propertirs 
for {{taskmanager.tmp.dirs}}
+  val mahoutHome = getMahoutHome()
+
+  // this is extra I/O for each cache call.  this needs to be moved somewhere 
where it is called
+  // only once.  Possibly FlinkDistributedEngine.
+  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+  val conf = GlobalConfiguration.getConfiguration()
+
+  if (!(conf == null )) {
+     persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+  } else {
+     persistanceRootDir = "/tmp/"
+  }
+
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val 
ds: DrmDataSet[K],
 
   override val keyClassTag: ClassTag[K] = classTag[K]
 
+  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
+    * the dataset to the filesystem and read it back when cache is called */
   def cache() = {
     if (!isCached) {
-      cacheFileName = System.nanoTime().toString
+      cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
       isCached = true
+      persist(ds, cacheFileName)
     }
-    implicit val typeInformation = createTypeInformation[(K,Vector)]
+    val _ds = readPersistedDataSet(cacheFileName, ds)
+
+    /** Leave the parallelism degree to be set the operators
+      * TODO: find out a way to set the parallelism degree based on the
+      * final drm after computation is actually triggered
+      *
+      *  // We may want to look more closely at this:
+      *  // since we've cached a drm, triggering a computation
+      *  // it may not make sense to keep the same parallelism degree
+      *  if (!(parallelismDeg == _ds.getParallelism)) {
+      *    _ds.setParallelism(parallelismDeg).rebalance()
+      *  }
+      *
+      */
 
-    val _ds = persist(ds, persistanceRootDir + cacheFileName)
     datasetWrap(_ds)
   }
 
-  def uncache() = {
-    // TODO
+  def uncache(): this.type = {
+    if (isCached) {
+      Hadoop2HDFSUtil.delete(cacheFileName)
+      isCached = false
+    }
     this
   }
 
@@ -99,12 +135,10 @@ class CheckpointedFlinkDrm[K: 
ClassTag:TypeInformation](val ds: DrmDataSet[K],
     * @param dataset [[DataSet]] to write to disk
     * @param path File path to write dataset to
     * @tparam T Type of the [[DataSet]] elements
-    * @return [[DataSet]] reading the just written file
     */
-  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: 
String): DataSet[T] = {
+  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: 
String): Unit = {
     val env = dataset.getExecutionEnvironment
     val outputFormat = new TypeSerializerOutputFormat[T]
-
     val filePath = new Path(path)
 
     outputFormat.setOutputFilePath(filePath)
@@ -112,14 +146,29 @@ class CheckpointedFlinkDrm[K: 
ClassTag:TypeInformation](val ds: DrmDataSet[K],
 
     dataset.output(outputFormat)
     env.execute("FlinkTools persist")
+  }
+
+  /** Read a [[DataSet]] from specified path and returns it as a DataSource 
for subsequent
+    * operations.
+    *
+    * @param path File path to read dataset from
+    * @param ds persisted ds to retrieve type information and environment forom
+    * @tparam T key Type of the [[DataSet]] elements
+    * @return [[DataSet]] the persisted dataset
+    */
+  def readPersistedDataSet[T: ClassTag : TypeInformation]
+       (path: String, ds: DataSet[T]): DataSet[T] = {
 
-    val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
+    val env = ds.getExecutionEnvironment
+    val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
+    val filePath = new Path(path)
     inputFormat.setFilePath(filePath)
 
     env.createInput(inputFormat)
   }
 
-  // Members declared in org.apache.mahout.math.drm.DrmLike   
+
+  // Members declared in org.apache.mahout.math.drm.DrmLike
 
   protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/b67398f9/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 10ce545..e769952 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,4 +105,10 @@ package object flinkbindings {
 
   private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: 
DrmLike[K]): ClassTag[_] = drm.keyClassTag
 
+  private[flinkbindings] def getMahoutHome() = {
+    var mhome = System.getenv("MAHOUT_HOME")
+    if (mhome == null) mhome = System.getProperty("mahout.home")
+    require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based 
flink jobs")
+    mhome
+  }
 }
\ No newline at end of file

Reply via email to