move getMahoutHome()

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b45f9825
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b45f9825
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b45f9825

Branch: refs/heads/master
Commit: b45f982581b2a2bed75b771672b4f2d66ef32840
Parents: b96918b
Author: Andrew Palumbo <[email protected]>
Authored: Sat Mar 26 17:35:37 2016 -0400
Committer: Andrew Palumbo <[email protected]>
Committed: Sat Mar 26 17:38:50 2016 -0400

----------------------------------------------------------------------
 .../apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++--
 .../main/scala/org/apache/mahout/flinkbindings/package.scala   | 6 ++++++
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index b0ca4c4..9b3a9f5 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction, 
RichFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.{TypeSerializerInputFormat, 
TypeSerializerOutputFormat}
 import org.apache.flink.api.scala._
@@ -58,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val 
ds: DrmDataSet[K],
   var persistanceRootDir: String = _
 
   // need to make sure that this is actually getting the correct propertirs 
for {{taskmanager.tmp.dirs}}
-  val mahoutHome = System.getProperty("MAHOUT_HOME")
+  val mahoutHome = getMahoutHome()
 
   GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/b45f9825/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 10ce545..e769952 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -105,4 +105,10 @@ package object flinkbindings {
 
   private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: 
DrmLike[K]): ClassTag[_] = drm.keyClassTag
 
+  private[flinkbindings] def getMahoutHome() = {
+    var mhome = System.getenv("MAHOUT_HOME")
+    if (mhome == null) mhome = System.getProperty("mahout.home")
+    require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based 
flink jobs")
+    mhome
+  }
 }
\ No newline at end of file

Reply via email to