use  as the base directory for cached files

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad22252c
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad22252c
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad22252c

Branch: refs/heads/master
Commit: ad22252ca2ab39e22d4df7cc22464af2c6179830
Parents: ad4c32c
Author: Andrew Palumbo <[email protected]>
Authored: Fri Mar 25 19:05:39 2016 -0400
Committer: Andrew Palumbo <[email protected]>
Committed: Fri Mar 25 19:05:39 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala     | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/ad22252c/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 5246938..0b3d13e 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -51,10 +51,13 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val 
ds: DrmDataSet[K],
   lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
 
   // persistance values
-  var cacheFileName: String = "/a"
+  var cacheFileName: String = "undefinedCacheName"
   var isCached: Boolean = false
   var parallelismDeg: Int = -1
-  val persistanceRootDir = "/tmp/"
+
+  // need to make sure that this is actually getting the correct propertirs 
for {{taskmanager.tmp.dirs}}
+  val persistanceRootDir = ds.getExecutionEnvironment.
+    
getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", 
"/tmp/")
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -92,7 +95,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val 
ds: DrmDataSet[K],
     datasetWrap(_ds)
   }
 
-  def uncache() = {
+  def uncache():this.type = {
     if (isCached) {
       Hadoop2HDFSUtil.delete(cacheFileName)
       isCached = false

Reply via email to