Repository: mahout Updated Branches: refs/heads/master fde08a9a5 -> b0ee8e265
simplified driver and made required changes to all, note: left job assembly untouched Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d0f64205 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d0f64205 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d0f64205 Branch: refs/heads/master Commit: d0f64205a116853aa471dd1361a635167da15fcc Parents: 0f037cb Author: pferrel <[email protected]> Authored: Sat Dec 27 15:43:41 2014 -0800 Committer: pferrel <[email protected]> Committed: Sat Dec 27 15:43:41 2014 -0800 ---------------------------------------------------------------------- .../apache/mahout/drivers/MahoutDriver.scala | 2 +- spark/pom.xml | 13 ++-- spark/src/main/assembly/job.xml | 17 +++++- .../mahout/drivers/ItemSimilarityDriver.scala | 12 +--- .../mahout/drivers/MahoutSparkDriver.scala | 20 +++--- .../mahout/drivers/RowSimilarityDriver.scala | 8 +-- .../apache/mahout/drivers/TestNBDriver.scala | 64 +++++++------------- .../apache/mahout/drivers/TrainNBDriver.scala | 18 ------ 8 files changed, 61 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index 8c1f8cf..3d9d4e1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -25,7 +25,7 @@ abstract class MahoutDriver { implicit protected var mc: DistributedContext = _ - protected var parser: MahoutOptionParser = _ + implicit protected var parser: MahoutOptionParser = _ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index f61f988..bcf9e30 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -157,8 +157,8 @@ </executions> </plugin> - <!-- create job jar to include CLI driver deps--> - <!-- leave this in even though there are no hadoop mapreduce jobs in this module --> + <!-- create an all dependencies job.jar --> + <!-- todo: before release we need a better way to do this MAHOUT-1636 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> @@ -171,13 +171,14 @@ </goals> <configuration> <descriptors> - <descriptor>src/main/assembly/job.xml</descriptor> + <descriptor>../spark/src/main/assembly/job.xml</descriptor> </descriptors> </configuration> </execution> </executions> </plugin> + </plugins> </build> <!-- @@ -319,12 +320,6 @@ <!-- 3rd-party --> - <dependency> - <groupId>com.github.scopt</groupId> - <artifactId>scopt_2.10</artifactId> - <version>3.2.0</version> - </dependency> - <!-- scala stuff --> <dependency> http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/assembly/job.xml ---------------------------------------------------------------------- diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml index 0c41f3d..2bdb3ce 100644 --- a/spark/src/main/assembly/job.xml +++ b/spark/src/main/assembly/job.xml @@ -42,5 +42,20 @@ </excludes> </dependencySet> </dependencySets> + <fileSets> + <fileSet> + <directory>${basedir}/target/classes</directory> + <outputDirectory>/</outputDirectory> + <excludes> + <exclude>*.jar</exclude> + </excludes> + </fileSet> + <fileSet> + <directory>${basedir}/target/classes</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>driver.classes.default.props</include> + </includes> + </fileSet> + </fileSets> </assembly> - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 01a18c9..36ba6ef 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -117,15 +117,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { + override protected def start() : Unit = { - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - //else leave as set in Spark config - - super.start(masterUrl, appName) + super.start readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], "filter" -> parser.opts("filter1").asInstanceOf[String], @@ -208,7 +202,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { } override def process: Unit = { - start() + start val indexedDatasets = readIndexedDatasets val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int], http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index e6299fd..ab40c3a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -34,7 +34,6 @@ import org.apache.mahout.sparkbindings._ * * override def main(args: Array[String]): Unit = { * - * * val parser = new MahoutOptionParser(programName = "shortname") { * head("somedriver", "Mahout 1.0-SNAPSHOT") * @@ -55,7 +54,7 @@ import org.apache.mahout.sparkbindings._ * } * * override def process: Unit = { - * start() + * start // override to change the default Kryo or SparkConf before the distributed context is created * // do the work here * stop * } @@ -70,15 +69,18 @@ abstract class MahoutSparkDriver extends MahoutDriver { /** Creates a Spark context to run the job inside. * Override to set the SparkConf values specific to the job, * these must be set before the context is created. - * @param masterUrl Spark master URL - * @param appName Name to display in Spark UI * */ - protected def start(masterUrl: String, appName: String) : Unit = { - sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option - + protected def start() : Unit = { if (!_useExistingContext) { - mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf) + sparkConf.set("spark.kryo.referenceTracking", "false") + .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option + + if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") + sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + //else leave as set in Spark config + mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String], + appName = parser.opts("appName").asInstanceOf[String], + sparkConf = sparkConf) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index 9b44b95..8c1bce4 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -106,11 +106,9 @@ object RowSimilarityDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { + override protected def start() : Unit = { - super.start(masterUrl, appName) + super.start readWriteSchema = new Schema( "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String], @@ -135,7 +133,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { } override def process: Unit = { - start() + start val indexedDataset = readIndexedDataset http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala index 7d0738c..368ee89 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala @@ -78,54 +78,36 @@ object TestNBDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { - - // will be only specific to this job. - // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default - - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - - // Note: set a large akka frame size for DSL NB (20) - //sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes.. - //else leave as set in Spark config - - super.start(masterUrl, appName) - - } - - /** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ - private def readTestSet: DrmLike[_] = { - val inputPath = parser.opts("input").asInstanceOf[String] - val trainingSet= drm.drmDfsRead(inputPath) - trainingSet - } +/** Read the test set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ +private def readTestSet: DrmLike[_] = { + val inputPath = parser.opts("input").asInstanceOf[String] + val trainingSet= drm.drmDfsRead(inputPath) + trainingSet +} - /** read the model from pathToModel using NBModel.DfsRead(...) */ - private def readModel: NBModel = { - val inputPath = parser.opts("pathToModel").asInstanceOf[String] - val model= NBModel.dfsRead(inputPath) - model - } +/** read the model from pathToModel using NBModel.DfsRead(...) */ +private def readModel: NBModel = { + val inputPath = parser.opts("pathToModel").asInstanceOf[String] + val model= NBModel.dfsRead(inputPath) + model +} - override def process: Unit = { - start() +override def process: Unit = { + start() - val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] - val outputPath = parser.opts("output").asInstanceOf[String] + val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] + val outputPath = parser.opts("output").asInstanceOf[String] - // todo: get the -ow option in to check for a model in the path and overwrite if flagged. + // todo: get the -ow option in to check for a model in the path and overwrite if flagged. - val testSet = readTestSet - val model = readModel - val analyzer= NaiveBayes.test(model, testSet, testComplementary) + val testSet = readTestSet + val model = readModel + val analyzer= NaiveBayes.test(model, testSet, testComplementary) - println(analyzer) + println(analyzer) - stop - } + stop +} } http://git-wip-us.apache.org/repos/asf/mahout/blob/d0f64205/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index 35ff90b..3d03c1d 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -72,24 +72,6 @@ object TrainNBDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { - - // will be only specific to this job. - // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default - - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - - // Note: set a large akka frame size for DSL NB (20) - // sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes.. - // else leave as set in Spark config - - super.start(masterUrl, appName) - - } - /** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ private def readTrainingSet: DrmLike[_]= { val inputPath = parser.opts("input").asInstanceOf[String]
