wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/100d343e Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/100d343e Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/100d343e Branch: refs/heads/master Commit: 100d343e4b6e66b1a7c581455cd1faab7bbdb538 Parents: ad22252 Author: Andrew Palumbo <[email protected]> Authored: Fri Mar 25 20:26:41 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Mar 25 20:29:47 2016 -0400 ---------------------------------------------------------------------- .../drm/CheckpointedFlinkDrm.scala | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/100d343e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 0b3d13e..a5bbbb5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -25,6 +25,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.core.fs.Path import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat +import org.apache.flink.configuration.GlobalConfiguration import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil @@ -56,8 +57,18 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], var parallelismDeg: Int = -1 // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} - val persistanceRootDir = ds.getExecutionEnvironment. - getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/") + val mahoutHome = System.getProperty("MAHOUT_HOME") + + GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") + + val conf = GlobalConfiguration.getConfiguration() + + if (!(conf == null )) { + val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/") + } else { + val persistanceRootDir = "/tmp/" + } + private lazy val dim: (Long, Int) = { // combine computation of ncol and nrow in one pass @@ -81,8 +92,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { - implicit val typeInformation = createTypeInformation[(K,Vector)] - implicit val inputFormat = (ds.getType) +// implicit val typeInformation = createTypeInformation[(K,Vector)] if (!isCached) { cacheFileName = persistanceRootDir + System.nanoTime().toString parallelismDeg = ds.getParallelism @@ -91,7 +101,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], } val _ds = readPersistedDataSet(cacheFileName, ds) - + if (!(parallelismDeg == _ds.getParallelism)) { + _ds.setParallelism(parallelismDeg).rebalance() + } datasetWrap(_ds) }
