Repository: mahout Updated Branches: refs/heads/MAHOUT-1865 [created] c9f9d6b89
MAHOUT-1865: Remove Hadoop 1 Support Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/46cea7ea Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/46cea7ea Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/46cea7ea Branch: refs/heads/MAHOUT-1865 Commit: 46cea7eac71658143c5b6d1152093a1e2c14a7df Parents: f4a71d0 Author: smarthi <[email protected]> Authored: Tue Sep 6 18:30:08 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Sep 6 18:30:08 2016 -0400 ---------------------------------------------------------------------- .../flinkbindings/io/Hadoop2HDFSUtil.scala | 18 ++--- .../mahout/h2o/common/Hadoop1HDFSUtil.scala | 63 --------------- .../mahout/h2o/common/Hadoop2HDFSUtil.scala | 63 +++++++++++++++ .../apache/mahout/h2obindings/H2OEngine.scala | 4 +- pom.xml | 4 +- .../apache/mahout/common/Hadoop1HDFSUtil.scala | 83 -------------------- .../apache/mahout/common/Hadoop2HDFSUtil.scala | 83 ++++++++++++++++++++ .../apache/mahout/drivers/TrainNBDriver.scala | 4 +- .../mahout/sparkbindings/SparkEngine.scala | 4 +- 9 files changed, 160 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala index 9b67913..211088a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala @@ -20,16 +20,14 @@ package org.apache.mahout.flinkbindings.io import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} +import org.apache.hadoop.io.SequenceFile.Reader +import org.apache.hadoop.io.Writable -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. - */ object Hadoop2HDFSUtil extends HDFSUtil { /** * Read the header of a sequence file and determine the Key and Value type - * @param path + * @param path - hdfs path of Sequence File * @return */ def readDrmHeader(path: String): DrmMetadata = { @@ -43,7 +41,7 @@ object Hadoop2HDFSUtil extends HDFSUtil { // Filter out anything starting with . .filter { s => - !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir + !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } // Take path @@ -57,12 +55,8 @@ object Hadoop2HDFSUtil extends HDFSUtil { throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") } - // flink is retiring hadoop 1 - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + val reader = new Reader(fs.getConf, Reader.file(partFilePath)) - // hadoop 2 reader -// val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf, -// SequenceFile.Reader.file(partFilePath)); try { new DrmMetadata( keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), @@ -75,7 +69,7 @@ object Hadoop2HDFSUtil extends HDFSUtil { /** * Delete a path from the filesystem - * @param path + * @param path - hdfs path */ def delete(path: String) { val dfsPath = new Path(path) http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala deleted file mode 100644 index a540cb1..0000000 --- a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.h2o.common - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} - -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work - * with Hadoop 2.0 - */ -object Hadoop1HDFSUtil extends HDFSUtil { - - - def readDrmHeader(path: String): DrmMetadata = { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - val partFilePath:Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) - ) - } finally { - reader.close() - } - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala new file mode 100644 index 0000000..4053d09 --- /dev/null +++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.h2o.common + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{SequenceFile, Writable} + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop2HDFSUtil extends HDFSUtil { + + + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath)) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 60bf7ac..494e8a8 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -25,7 +25,7 @@ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.h2obindings.ops._ import org.apache.mahout.h2obindings.drm._ -import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil} +import org.apache.mahout.h2o.common.{Hadoop2HDFSUtil, HDFSUtil} import org.apache.mahout.logging._ /** H2O specific non-DRM operations */ @@ -34,7 +34,7 @@ object H2OEngine extends DistributedEngine { private final implicit val log = getLog(H2OEngine.getClass) // By default, use Hadoop 1 utils - var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil def colMeans[K](drm: CheckpointedDrm[K]): Vector = H2OHelper.colMeans(drm.h2odrm.frame) http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 58d32b2..0120a8a 100644 --- a/pom.xml +++ b/pom.xml @@ -118,11 +118,11 @@ <mscala.version>3.2.0</mscala.version> <hbase.version>1.0.0</hbase.version> <lucene.version>5.5.2</lucene.version> - <slf4j.version>1.7.19</slf4j.version> + <slf4j.version>1.7.21</slf4j.version> <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.5.2</spark.version> - <flink.version>1.1.1</flink.version> + <flink.version>1.1.2</flink.version> <h2o.version>0.1.25</h2o.version> <jackson.version>2.7.4</jackson.version> </properties> http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala deleted file mode 100644 index 29599b8..0000000 --- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.common - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{SequenceFile, Writable} -import org.apache.spark.SparkContext - -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work - * with Hadoop 2.0 - */ -object Hadoop1HDFSUtil extends HDFSUtil { - - - /** Read DRM header information off (H)DFS. */ - override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = { - - val dfsPath = new Path(path) - - val fs = dfsPath.getFileSystem(sc.hadoopConfiguration) - - // Apparently getFileSystem() doesn't set conf?? - fs.setConf(sc.hadoopConfiguration) - - val partFilePath:Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) - ) - } finally { - reader.close() - } - - } - - /** - * Delete a path from the filesystem - * @param path - */ - def delete(path: String) { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - if (fs.exists(dfsPath)) { - fs.delete(dfsPath, true) - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala new file mode 100644 index 0000000..de601d5 --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{SequenceFile, Writable} +import org.apache.spark.SparkContext + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + */ +object Hadoop2HDFSUtil extends HDFSUtil { + + + /** Read DRM header information off (H)DFS. */ + override def readDrmHeader(path: String)(implicit sc: SparkContext): DrmMetadata = { + + val dfsPath = new Path(path) + + val fs = dfsPath.getFileSystem(sc.hadoopConfiguration) + + // Apparently getFileSystem() doesn't set conf?? + fs.setConf(sc.hadoopConfiguration) + + val partFilePath:Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs.getConf, SequenceFile.Reader.file(partFilePath)) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]) + ) + } finally { + reader.close() + } + + } + + /** + * Delete a path from the filesystem + * @param path - hdfs path + */ + def delete(path: String) { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + if (fs.exists(dfsPath)) { + fs.delete(dfsPath, true) + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index eeed97a..d0e711a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -18,7 +18,7 @@ package org.apache.mahout.drivers import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _} -import org.apache.mahout.common.Hadoop1HDFSUtil +import org.apache.mahout.common.Hadoop2HDFSUtil import org.apache.mahout.math.drm import org.apache.mahout.math.drm.DrmLike @@ -95,7 +95,7 @@ object TrainNBDriver extends MahoutSparkDriver { val fullPathToModel = outputPath + NBModel.modelBaseDirectory if (overwrite) { - Hadoop1HDFSUtil.delete(fullPathToModel) + Hadoop2HDFSUtil.delete(fullPathToModel) } val trainingSet = readTrainingSet() http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 47d14db..ee526c5 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -18,7 +18,7 @@ package org.apache.mahout.sparkbindings import org.apache.hadoop.io._ -import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil} +import org.apache.mahout.common.{HDFSUtil, Hadoop2HDFSUtil} import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader import org.apache.mahout.math._ import org.apache.mahout.math.drm._ @@ -39,7 +39,7 @@ import scala.reflect.ClassTag object SparkEngine extends DistributedEngine { // By default, use Hadoop 1 utils - var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil def colSums[K](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol
