use as the base directory for cached files
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ad22252c Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ad22252c Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ad22252c Branch: refs/heads/master Commit: ad22252ca2ab39e22d4df7cc22464af2c6179830 Parents: ad4c32c Author: Andrew Palumbo <[email protected]> Authored: Fri Mar 25 19:05:39 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Mar 25 19:05:39 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/ad22252c/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 5246938..0b3d13e 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -51,10 +51,13 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 // persistance values - var cacheFileName: String = "/a" + var cacheFileName: String = "undefinedCacheName" var isCached: Boolean = false var parallelismDeg: Int = -1 - val persistanceRootDir = "/tmp/" + + // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} + val persistanceRootDir = ds.getExecutionEnvironment. + getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/") private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -92,7 +95,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], datasetWrap(_ds) } - def uncache() = { + def uncache():this.type = { if (isCached) { Hadoop2HDFSUtil.delete(cacheFileName) isCached = false
