move getMahoutHome()
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b45f9825 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b45f9825 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b45f9825 Branch: refs/heads/master Commit: b45f982581b2a2bed75b771672b4f2d66ef32840 Parents: b96918b Author: Andrew Palumbo <[email protected]> Authored: Sat Mar 26 17:35:37 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sat Mar 26 17:38:50 2016 -0400 ---------------------------------------------------------------------- .../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++-- .../main/scala/org/apache/mahout/flinkbindings/package.scala | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index b0ca4c4..9b3a9f5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -18,7 +18,7 @@ */ package org.apache.mahout.flinkbindings.drm -import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} +import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, RichFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} import org.apache.flink.api.scala._ @@ -58,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], var persistanceRootDir: String = _ // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} - val mahoutHome = System.getProperty("MAHOUT_HOME") + val mahoutHome = getMahoutHome() GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 10ce545..e769952 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -105,4 +105,10 @@ package object flinkbindings { private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag + private[flinkbindings] def getMahoutHome() = { + var mhome = System.getenv("MAHOUT_HOME") + if (mhome == null) mhome = System.getProperty("mahout.home") + require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs") + mhome + } } \ No newline at end of file
