Repository: mahout
Updated Branches:
  refs/heads/MAHOUT-1865 [created] c9f9d6b89


MAHOUT-1865: Remove Hadoop 1 Support


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/46cea7ea
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/46cea7ea
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/46cea7ea

Branch: refs/heads/MAHOUT-1865
Commit: 46cea7eac71658143c5b6d1152093a1e2c14a7df
Parents: f4a71d0
Author: smarthi <[email protected]>
Authored: Tue Sep 6 18:30:08 2016 -0400
Committer: smarthi <[email protected]>
Committed: Tue Sep 6 18:30:08 2016 -0400

----------------------------------------------------------------------
 .../flinkbindings/io/Hadoop2HDFSUtil.scala      | 18 ++---
 .../mahout/h2o/common/Hadoop1HDFSUtil.scala     | 63 ---------------
 .../mahout/h2o/common/Hadoop2HDFSUtil.scala     | 63 +++++++++++++++
 .../apache/mahout/h2obindings/H2OEngine.scala   |  4 +-
 pom.xml                                         |  4 +-
 .../apache/mahout/common/Hadoop1HDFSUtil.scala  | 83 --------------------
 .../apache/mahout/common/Hadoop2HDFSUtil.scala  | 83 ++++++++++++++++++++
 .../apache/mahout/drivers/TrainNBDriver.scala   |  4 +-
 .../mahout/sparkbindings/SparkEngine.scala      |  4 +-
 9 files changed, 160 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
index 9b67913..211088a 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
@@ -20,16 +20,14 @@ package org.apache.mahout.flinkbindings.io
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.hadoop.io.SequenceFile.Reader
+import org.apache.hadoop.io.Writable
 
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies.
- */
 object Hadoop2HDFSUtil extends HDFSUtil {
 
   /**
    * Read the header of a sequence file and determine the Key and Value type
-   * @param path
+   * @param path - hdfs path of Sequence File
    * @return
    */
   def readDrmHeader(path: String): DrmMetadata = {
@@ -43,7 +41,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
 
       // Filter out anything starting with .
       .filter { s =>
-        !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDir
+        !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDirectory
       }
 
       // Take path
@@ -57,12 +55,8 @@ object Hadoop2HDFSUtil extends HDFSUtil {
         throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
       }
 
-    // flink is retiring hadoop 1
-     val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+     val reader = new Reader(fs.getConf, Reader.file(partFilePath))
 
-    // hadoop 2 reader
-//    val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf,
-//      SequenceFile.Reader.file(partFilePath));
     try {
       new DrmMetadata(
         keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
@@ -75,7 +69,7 @@ object Hadoop2HDFSUtil extends HDFSUtil {
 
   /**
    * Delete a path from the filesystem
-   * @param path
+   * @param path - hdfs path
    */
   def delete(path: String) {
     val dfsPath = new Path(path)

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala 
b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index a540cb1..0000000
--- a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.h2o.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-  
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    val partFilePath:Path = fs.listStatus(dfsPath)
-
-        // Filter out anything starting with .
-        .filter { s => !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDir }
-
-        // Take path
-        .map(_.getPath)
-
-        // Take only one, if any
-        .headOption
-
-        // Require there's at least one partition file found.
-        .getOrElse {
-      throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
-    }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
-      )
-    } finally {
-      reader.close()
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala 
b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..4053d09
--- /dev/null
+++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.h2o.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+  
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    val partFilePath:Path = fs.listStatus(dfsPath)
+
+        // Filter out anything starting with .
+        .filter { s => !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+        // Take path
+        .map(_.getPath)
+
+        // Take only one, if any
+        .headOption
+
+        // Require there's at least one partition file found.
+        .getOrElse {
+      throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
+    }
+
+    val reader = new SequenceFile.Reader(fs.getConf, 
SequenceFile.Reader.file(partFilePath))
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+      )
+    } finally {
+      reader.close()
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 60bf7ac..494e8a8 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -25,7 +25,7 @@ import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.h2obindings.ops._
 import org.apache.mahout.h2obindings.drm._
-import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil}
+import org.apache.mahout.h2o.common.{Hadoop2HDFSUtil, HDFSUtil}
 import org.apache.mahout.logging._
 
 /** H2O specific non-DRM operations */
@@ -34,7 +34,7 @@ object H2OEngine extends DistributedEngine {
   private final implicit val log = getLog(H2OEngine.getClass)
 
   // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   def colMeans[K](drm: CheckpointedDrm[K]): Vector =
     H2OHelper.colMeans(drm.h2odrm.frame)

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58d32b2..0120a8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,11 +118,11 @@
     <mscala.version>3.2.0</mscala.version>
     <hbase.version>1.0.0</hbase.version>
     <lucene.version>5.5.2</lucene.version>
-    <slf4j.version>1.7.19</slf4j.version>
+    <slf4j.version>1.7.21</slf4j.version>
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
     <spark.version>1.5.2</spark.version>
-    <flink.version>1.1.1</flink.version>
+    <flink.version>1.1.2</flink.version>
     <h2o.version>0.1.25</h2o.version>
     <jackson.version>2.7.4</jackson.version>
   </properties>

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala 
b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
deleted file mode 100644
index 29599b8..0000000
--- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.common
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{SequenceFile, Writable}
-import org.apache.spark.SparkContext
-
-/**
- * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
- * with Hadoop 2.0
- */
-object Hadoop1HDFSUtil extends HDFSUtil {
-
-
-  /** Read DRM header information off (H)DFS. */
-  override def readDrmHeader(path: String)(implicit sc: SparkContext): 
DrmMetadata = {
-
-    val dfsPath = new Path(path)
-
-    val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
-
-    // Apparently getFileSystem() doesn't set conf??
-    fs.setConf(sc.hadoopConfiguration)
-
-    val partFilePath:Path = fs.listStatus(dfsPath)
-
-        // Filter out anything starting with .
-        .filter { s => !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDir }
-
-        // Take path
-        .map(_.getPath)
-
-        // Take only one, if any
-        .headOption
-
-        // Require there's at least one partition file found.
-        .getOrElse {
-      throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
-    }
-
-    val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
-      )
-    } finally {
-      reader.close()
-    }
-
-  }
-
-  /**
-   * Delete a path from the filesystem
-   * @param path
-   */
-  def delete(path: String) {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    if (fs.exists(dfsPath)) {
-      fs.delete(dfsPath, true)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala 
b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..de601d5
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop2HDFSUtil.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
+import org.apache.spark.SparkContext
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+
+  /** Read DRM header information off (H)DFS. */
+  override def readDrmHeader(path: String)(implicit sc: SparkContext): 
DrmMetadata = {
+
+    val dfsPath = new Path(path)
+
+    val fs = dfsPath.getFileSystem(sc.hadoopConfiguration)
+
+    // Apparently getFileSystem() doesn't set conf??
+    fs.setConf(sc.hadoopConfiguration)
+
+    val partFilePath:Path = fs.listStatus(dfsPath)
+
+        // Filter out anything starting with .
+        .filter { s => !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDirectory }
+
+        // Take path
+        .map(_.getPath)
+
+        // Take only one, if any
+        .headOption
+
+        // Require there's at least one partition file found.
+        .getOrElse {
+      throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
+    }
+
+    val reader = new SequenceFile.Reader(fs.getConf, 
SequenceFile.Reader.file(partFilePath))
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+      )
+    } finally {
+      reader.close()
+    }
+
+  }
+
+  /**
+   * Delete a path from the filesystem
+   * @param path - hdfs path
+   */
+  def delete(path: String) {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    if (fs.exists(dfsPath)) {
+      fs.delete(dfsPath, true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index eeed97a..d0e711a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.drivers
 
 import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _}
-import org.apache.mahout.common.Hadoop1HDFSUtil
+import org.apache.mahout.common.Hadoop2HDFSUtil
 import org.apache.mahout.math.drm
 import org.apache.mahout.math.drm.DrmLike
 
@@ -95,7 +95,7 @@ object TrainNBDriver extends MahoutSparkDriver {
     val fullPathToModel = outputPath + NBModel.modelBaseDirectory
 
     if (overwrite) {
-       Hadoop1HDFSUtil.delete(fullPathToModel)
+       Hadoop2HDFSUtil.delete(fullPathToModel)
     }
 
     val trainingSet = readTrainingSet()

http://git-wip-us.apache.org/repos/asf/mahout/blob/46cea7ea/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 47d14db..ee526c5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -18,7 +18,7 @@
 package org.apache.mahout.sparkbindings
 
 import org.apache.hadoop.io._
-import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil}
+import org.apache.mahout.common.{HDFSUtil, Hadoop2HDFSUtil}
 import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader
 import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
@@ -39,7 +39,7 @@ import scala.reflect.ClassTag
 object SparkEngine extends DistributedEngine {
 
   // By default, use Hadoop 1 utils
-  var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
 
   def colSums[K](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol

Reply via email to