Repository: mahout Updated Branches: refs/heads/master c5934c2f7 -> e73fdb869
MAHOUT-1865: Remove Hadoop1 Profile, this closes apache/mahout#253 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e73fdb86 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e73fdb86 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e73fdb86 Branch: refs/heads/master Commit: e73fdb8694e80e1e95a1213097434749726fd8af Parents: c5934c2 Author: smarthi <[email protected]> Authored: Wed Sep 7 00:32:50 2016 -0400 Committer: smarthi <[email protected]> Committed: Wed Sep 7 00:32:50 2016 -0400 ---------------------------------------------------------------------- .../flinkbindings/io/Hadoop2HDFSUtil.scala | 18 ++--- .../mahout/h2o/common/Hadoop1HDFSUtil.scala | 63 --------------- .../mahout/h2o/common/Hadoop2HDFSUtil.scala | 63 +++++++++++++++ .../apache/mahout/h2obindings/H2OEngine.scala | 4 +- .../apache/mahout/cf/taste/hadoop/als/ALS.java | 5 +- .../hadoop/als/FactorizationEvaluator.java | 2 +- .../cf/taste/hadoop/item/RecommenderJob.java | 2 +- .../recommender/svd/AbstractFactorizer.java | 2 +- .../classifier/naivebayes/BayesUtils.java | 11 ++- .../training/IndexInstancesMapper.java | 2 +- pom.xml | 4 +- .../apache/mahout/common/Hadoop1HDFSUtil.scala | 83 -------------------- .../apache/mahout/common/Hadoop2HDFSUtil.scala | 83 ++++++++++++++++++++ .../apache/mahout/drivers/TrainNBDriver.scala | 4 +- .../mahout/sparkbindings/SparkEngine.scala | 4 +- 15 files changed, 173 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala index 9b67913..211088a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala @@ -20,16 +20,14 @@ package org.apache.mahout.flinkbindings.io import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} +import org.apache.hadoop.io.SequenceFile.Reader +import org.apache.hadoop.io.Writable -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. - */ object Hadoop2HDFSUtil extends HDFSUtil { /** * Read the header of a sequence file and determine the Key and Value type - * @param path + * @param path - hdfs path of Sequence File * @return */ def readDrmHeader(path: String): DrmMetadata = { @@ -43,7 +41,7 @@ object Hadoop2HDFSUtil extends HDFSUtil { // Filter out anything starting with . .filter { s => - !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir + !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } // Take path @@ -57,12 +55,8 @@ object Hadoop2HDFSUtil extends HDFSUtil { throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") } - // flink is retiring hadoop 1 - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + val reader = new Reader(fs.getConf, Reader.file(partFilePath)) - // hadoop 2 reader -// val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf, -// SequenceFile.Reader.file(partFilePath)); try { new DrmMetadata( keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), @@ -75,7 +69,7 @@ object Hadoop2HDFSUtil extends HDFSUtil { /** * Delete a path from the filesystem - * @param path + * @param path - hdfs path */ def delete(path: String) { val dfsPath = new Path(path) http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala deleted file mode 100644 index a540cb1..0000000 --- a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.h2o.common - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} - -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work - * with Hadoop 2.0 - */ -object Hadoop1HDFSUtil extends HDFSUtil { - - - def readDrmHeader(path: String): DrmMetadata = { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - val partFilePath:Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) - ) - } finally { - reader.close() - } - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala new file mode 100644 index 0000000..4053d09 --- /dev/null +++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.h2o.common + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{SequenceFile, Writable} + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop2HDFSUtil extends HDFSUtil { + + + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath)) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 60bf7ac..494e8a8 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -25,7 +25,7 @@ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.h2obindings.ops._ import org.apache.mahout.h2obindings.drm._ -import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil} +import org.apache.mahout.h2o.common.{Hadoop2HDFSUtil, HDFSUtil} import org.apache.mahout.logging._ /** H2O specific non-DRM operations */ @@ -34,7 +34,7 @@ object H2OEngine extends DistributedEngine { private final implicit val log = getLog(H2OEngine.getClass) // By default, use Hadoop 1 utils - var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil def colMeans[K](drm: CheckpointedDrm[K]): Vector = H2OHelper.colMeans(drm.h2odrm.frame) http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java index 1c07b67..4bb95ae 100644 --- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java +++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java @@ -17,12 +17,11 @@ package org.apache.mahout.cf.taste.hadoop.als; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; - -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -64,7 +63,7 @@ final class ALS { LocalFileSystem localFs = FileSystem.getLocal(conf); for (Path cachedFile : cachedFiles) { - try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs, cachedFile, conf)){ + try (SequenceFile.Reader reader = new SequenceFile.Reader(localFs.getConf(), SequenceFile.Reader.file(cachedFile))) { while (reader.next(rowIndex, row)) { featureMatrix.put(rowIndex.get(), row.get()); } http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java index e69053c..4e6aaf5 100644 --- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java +++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java @@ -112,7 +112,7 @@ public class FactorizationEvaluator extends AbstractJob { return 0; } - double computeRmse(Path errors) { + private double computeRmse(Path errors) { RunningAverage average = new FullRunningAverage(); for (Pair<DoubleWritable,NullWritable> entry : new SequenceFileDirIterable<DoubleWritable, NullWritable>(errors, PathType.LIST, PathFilters.logsCRCFilter(), http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java index 643b2c3..129db1d 100644 --- a/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java +++ b/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java @@ -219,7 +219,7 @@ public final class RecommenderJob extends AbstractJob { //start the multiplication of the co-occurrence matrix by the user vectors if (shouldRunNextPhase(parsedArgs, currentPhase)) { - Job partialMultiply = new Job(getConf(), "partialMultiply"); + Job partialMultiply = Job.getInstance(getConf(), "partialMultiply"); Configuration partialMultiplyConf = partialMultiply.getConfiguration(); MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class, http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java index 5225222..0a39a1d 100644 --- a/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java +++ b/mr/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/AbstractFactorizer.java @@ -78,7 +78,7 @@ public abstract class AbstractFactorizer implements Factorizer { } private static FastByIDMap<Integer> createIDMapping(int size, LongPrimitiveIterator idIterator) { - FastByIDMap<Integer> mapping = new FastByIDMap<Integer>(size); + FastByIDMap<Integer> mapping = new FastByIDMap<>(size); int index = 0; while (idIterator.hasNext()) { mapping.put(idIterator.nextLong(), index++); http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java index c09dd83..4db8b17 100644 --- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java +++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/BayesUtils.java @@ -17,14 +17,13 @@ package org.apache.mahout.classifier.naivebayes; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -102,7 +101,9 @@ public final class BayesUtils { throws IOException { FileSystem fs = FileSystem.get(indexPath.toUri(), conf); int i = 0; - try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)) { + try (SequenceFile.Writer writer = + SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath), + SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))) { for (String label : labels) { writer.append(new Text(label), new IntWritable(i++)); } @@ -115,7 +116,9 @@ public final class BayesUtils { FileSystem fs = FileSystem.get(indexPath.toUri(), conf); Collection<String> seen = new HashSet<>(); int i = 0; - try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class)){ + try (SequenceFile.Writer writer = + SequenceFile.createWriter(fs.getConf(), SequenceFile.Writer.file(indexPath), + SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(IntWritable.class))){ for (Object label : labels) { String theLabel = SLASH.split(((Pair<?, ?>) label).getFirst().toString())[1]; if (!seen.contains(theLabel)) { http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java index 40ca2e9..4df869e 100644 --- a/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java +++ b/mr/src/main/java/org/apache/mahout/classifier/naivebayes/training/IndexInstancesMapper.java @@ -31,7 +31,7 @@ public class IndexInstancesMapper extends Mapper<Text, VectorWritable, IntWritab private static final Pattern SLASH = Pattern.compile("/"); - public enum Counter { SKIPPED_INSTANCES } + enum Counter { SKIPPED_INSTANCES } private OpenObjectIntHashMap<String> labelIndex; http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 58d32b2..0120a8a 100644 --- a/pom.xml +++ b/pom.xml @@ -118,11 +118,11 @@ <mscala.version>3.2.0</mscala.version> <hbase.version>1.0.0</hbase.version> <lucene.version>5.5.2</lucene.version> - <slf4j.version>1.7.19</slf4j.version> + <slf4j.version>1.7.21</slf4j.version> <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.5.2</spark.version> - <flink.version>1.1.1</flink.version> + <flink.version>1.1.2</flink.version> <h2o.version>0.1.25</h2o.version> <jackson.version>2.7.4</jackson.version> </properties> http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala deleted file mode 100644 index 29599b8..0000000 --- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.common - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} -import org.apache.spark.SparkContext - -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work - * with Hadoop 2.0 - */ -object Hadoop1HDFSUtil extends HDFSUtil { - - - /** Read DRM header information off (H)DFS. */ - override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = { - - val dfsPath = new Path(path) - - val fs = dfsPath.getFileSystem(sc.hadoopConfiguration) - - // Apparently getFileSystem() doesn't set conf?? - fs.setConf(sc.hadoopConfiguration) - - val partFilePath:Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) - ) - } finally { - reader.close() - } - - } - - /** - * Delete a path from the filesystem - * @param path - */ - def delete(path: String) { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - if (fs.exists(dfsPath)) { - fs.delete(dfsPath, true) - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala new file mode 100644 index 0000000..de601d5 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{SequenceFile, Writable} +import org.apache.spark.SparkContext + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop2HDFSUtil extends HDFSUtil { + + + /** Read DRM header information off (H)DFS. */ + override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = { + + val dfsPath = new Path(path) + + val fs = dfsPath.getFileSystem(sc.hadoopConfiguration) + + // Apparently getFileSystem() doesn't set conf?? + fs.setConf(sc.hadoopConfiguration) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath)) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + + /** + * Delete a path from the filesystem + * @param path - hdfs path + */ + def delete(path: String) { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + if (fs.exists(dfsPath)) { + fs.delete(dfsPath, true) + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index eeed97a..d0e711a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -18,7 +18,7 @@ package org.apache.mahout.drivers import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _} -import org.apache.mahout.common.Hadoop1HDFSUtil +import org.apache.mahout.common.Hadoop2HDFSUtil import org.apache.mahout.math.drm import org.apache.mahout.math.drm.DrmLike @@ -95,7 +95,7 @@ object TrainNBDriver extends MahoutSparkDriver { val fullPathToModel = outputPath + NBModel.modelBaseDirectory if (overwrite) { - Hadoop1HDFSUtil.delete(fullPathToModel) + Hadoop2HDFSUtil.delete(fullPathToModel) } val trainingSet = readTrainingSet() http://git-wip-us.apache.org/repos/asf/mahout/blob/e73fdb86/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 47d14db..ee526c5 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -18,7 +18,7 @@ package org.apache.mahout.sparkbindings import org.apache.hadoop.io._ -import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil} +import org.apache.mahout.common.{HDFSUtil, Hadoop2HDFSUtil} import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader import org.apache.mahout.math._ import org.apache.mahout.math.drm._ @@ -39,7 +39,7 @@ import scala.reflect.ClassTag object SparkEngine extends DistributedEngine { // By default, use Hadoop 1 utils - var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil def colSums[K](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol
