Repository: mahout Updated Branches: refs/heads/master 8a1978a75 -> 30fe6cc83
MAHOUT-1493: Add CLI options for --overwrite and --alphaI to NB Drivers. closes apache/mahout#111 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/30fe6cc8 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/30fe6cc8 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/30fe6cc8 Branch: refs/heads/master Commit: 30fe6cc83f83578c922d8c01742ae7a6939280e8 Parents: 8a1978a Author: Andrew Palumbo <[email protected]> Authored: Sun Apr 5 23:23:29 2015 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sun Apr 5 23:23:29 2015 -0400 ---------------------------------------------------------------------- examples/bin/classify-20newsgroups.sh | 5 +--- .../mahout/classifier/naivebayes/NBModel.scala | 7 ++++-- .../apache/mahout/common/Hadoop1HDFSUtil.scala | 20 ++++++++++++++- .../apache/mahout/drivers/TestNBDriver.scala | 1 + .../apache/mahout/drivers/TrainNBDriver.scala | 26 +++++++++++++++++++- 5 files changed, 51 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/30fe6cc8/examples/bin/classify-20newsgroups.sh ---------------------------------------------------------------------- diff --git a/examples/bin/classify-20newsgroups.sh b/examples/bin/classify-20newsgroups.sh index ea949e0..d116691 100755 --- a/examples/bin/classify-20newsgroups.sh +++ b/examples/bin/classify-20newsgroups.sh @@ -65,9 +65,6 @@ if [ "x$alg" == "xnaivebayes-Spark" -o "x$alg" == "xcnaivebayes-Spark" ]; then echo "Plese set your MASTER env variable to point to your Spark Master URL. exiting..." exit 1 fi - set +e - $HADOOP dfs -rmr ${WORK_DIR}/spark-model - set -e fi if [ "x$alg" != "xclean" ]; then @@ -161,7 +158,7 @@ if ( [ "x$alg" == "xnaivebayes-MapReduce" ] || [ "x$alg" == "xcnaivebayes-MapR echo "Training Naive Bayes model" ./bin/mahout spark-trainnb \ -i ${WORK_DIR}/20news-train-vectors \ - -o ${WORK_DIR}/spark-model $c -ma $MASTER + -o ${WORK_DIR}/spark-model $c -ow -ma $MASTER echo "Self testing on training set" ./bin/mahout spark-testnb \ http://git-wip-us.apache.org/repos/asf/mahout/blob/30fe6cc8/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NBModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NBModel.scala b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NBModel.scala index c1935fe..a5121c2 100644 --- a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NBModel.scala +++ b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NBModel.scala @@ -100,7 +100,7 @@ class NBModel(val weightsPerLabelAndFeature: Matrix = null, //todo: write something other than a DRM for label Index, is Complementary, alphaI. // add a directory to put all of the DRMs in - val fullPathToModel = pathToModel + "/naiveBayesModel" + val fullPathToModel = pathToModel + NBModel.modelBaseDirectory drmParallelize(weightsPerLabelAndFeature).dfsWrite(fullPathToModel + "/weightsPerLabelAndFeatureDrm.drm") drmParallelize(sparse(weightsPerFeature)).dfsWrite(fullPathToModel + "/weightsPerFeatureDrm.drm") @@ -150,6 +150,9 @@ class NBModel(val weightsPerLabelAndFeature: Matrix = null, } object NBModel extends java.io.Serializable { + + val modelBaseDirectory = "/naiveBayesModel" + /** * Read a trained model in from from the filesystem. * @param pathToModel directory from which to read individual model components @@ -159,7 +162,7 @@ object NBModel extends java.io.Serializable { //todo: Takes forever to read we need a more practical method of writing models. Readers/Writers? // read from a base directory for all drms - val fullPathToModel = pathToModel + "/naiveBayesModel" + val fullPathToModel = pathToModel + modelBaseDirectory val weightsPerFeatureDrm = drmDfsRead(fullPathToModel + "/weightsPerFeatureDrm.drm").checkpoint(CacheHint.MEMORY_ONLY) val weightsPerFeature = weightsPerFeatureDrm.collect(0, ::) http://git-wip-us.apache.org/repos/asf/mahout/blob/30fe6cc8/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala index 87977ff..047104a 100644 --- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala +++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala @@ -17,6 +17,7 @@ package org.apache.mahout.common + import org.apache.hadoop.io.{Writable, SequenceFile} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration @@ -29,7 +30,11 @@ import JavaConversions._ */ object Hadoop1HDFSUtil extends HDFSUtil { - + /** + * Read the header of a sequence file and determine the Key and Value type + * @param path + * @return + */ def readDrmHeader(path: String): DrmMetadata = { val dfsPath = new Path(path) val fs = dfsPath.getFileSystem(new Configuration()) @@ -62,4 +67,17 @@ object Hadoop1HDFSUtil extends HDFSUtil { } + /** + * Delete a path from the filesystem + * @param path + */ + def delete(path: String) { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + if (fs.exists(dfsPath)) { + fs.delete(dfsPath, true) + } + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/30fe6cc8/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala index ee58f17..ca87d5f 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala @@ -20,6 +20,7 @@ package org.apache.mahout.drivers import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, NBModel} import org.apache.mahout.math.drm import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.drm.RLikeDrmOps.drm2RLikeOps import scala.collection.immutable.HashMap http://git-wip-us.apache.org/repos/asf/mahout/blob/30fe6cc8/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index 418027f..0eed8d4 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -19,8 +19,10 @@ package org.apache.mahout.drivers import org.apache.mahout.classifier.naivebayes._ import org.apache.mahout.classifier.naivebayes.SparkNaiveBayes +import org.apache.mahout.common.Hadoop1HDFSUtil import org.apache.mahout.math.drm import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.drm.RLikeDrmOps.drm2RLikeOps import scala.collection.immutable.HashMap @@ -50,6 +52,20 @@ object TrainNBDriver extends MahoutSparkDriver { options + ("trainComplementary" -> true) } text ("Train a complementary model, Default: false.") + // Laplace smoothing paramater default is 1.0 + opts = opts + ("alphaI" -> 1.0) + opt[Double]("alphaI") abbr ("a") action { (x, options) => + options + ("alphaI" -> x) + } text ("Laplace soothing factor default is 1.0") validate { x => + if (x > 0) success else failure("Option --alphaI must be > 0") + } + + // Overwrite the output directory (with the model) if it exists? Default: false + opts = opts + ("overwrite" -> false) + opt[Unit]("overwrite") abbr ("ow") action { (_, options) => + options + ("overwrite" -> true) + } text ("Overwrite the output directory (with the model) if it exists? Default: false") + // Spark config options--not driver specific parseSparkOptions() @@ -74,11 +90,19 @@ object TrainNBDriver extends MahoutSparkDriver { val complementary = parser.opts("trainComplementary").asInstanceOf[Boolean] val outputPath = parser.opts("output").asInstanceOf[String] + val alpha = parser.opts("alphaI").asInstanceOf[Double] + val overwrite = parser.opts("overwrite").asInstanceOf[Boolean] + + val fullPathToModel = outputPath + NBModel.modelBaseDirectory + + if (overwrite) { + Hadoop1HDFSUtil.delete(fullPathToModel) + } val trainingSet = readTrainingSet // Use Spark-Optimized Naive Bayes here to extract labels and aggregate options val (labelIndex, aggregatedObservations) = SparkNaiveBayes.extractLabelsAndAggregateObservations(trainingSet) - val model = SparkNaiveBayes.train(aggregatedObservations, labelIndex, complementary) + val model = SparkNaiveBayes.train(aggregatedObservations, labelIndex, complementary, alpha.toFloat) model.dfsWrite(outputPath)
