http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2a82c5a..054e96b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.10.0 - unreleased
 
+  MAHOUT-1655: Refactors mr-legacy into mahout-hdfs and mahout-mr, Spark now 
depends on much reduced mahout-hdfs
+
   MAHOUT-1522: Handle logging levels via log4j.xml (akm)
 
   MAHOUT-1602: Euclidean Distance Similarity Math (Leonardo Fernandez Sanchez, 
smarthi)

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index c51c239..772c184 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -182,7 +182,7 @@ then
   done
 
   if [ "$H2O" == "1" ]; then
-    for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar; do
+    for f in $MAHOUT_HOME/hdfs/target/mahout-hdfs-*.jar; do
        CLASSPATH=${CLASSPATH}:$f;
     done
 
@@ -194,7 +194,7 @@ then
   # add jars for running from the command line if we requested shell or spark 
CLI driver
   if [ "$SPARK" == "1" ]; then
 
-    for f in $MAHOUT_HOME/mrlegacy/target/mahout-mrlegacy-*.jar ; do
+    for f in $MAHOUT_HOME/hdfs/target/mahout-hdfs-*.jar ; do
       CLASSPATH=${CLASSPATH}:$f;
     done
 
@@ -227,11 +227,11 @@ then
   done
 else
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/math/target/classes
-  CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/mrlegacy/target/classes
+  CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/hdfs/target/classes
+  CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/mr/target/classes
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/integration/target/classes
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/examples/target/classes
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/math-scala/target/classes
-  #CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/mrlegacy/src/main/resources
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/spark/target/classes
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/spark-shell/target/classes
   CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/h2o/target/classes

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index b2e5071..f3e6336 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -91,7 +91,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-hdfs</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.mahout</groupId>
+      <artifactId>mahout-mr</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.mahout</groupId>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/distribution/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/assembly/bin.xml 
b/distribution/src/main/assembly/bin.xml
index a9b933e..d6c9076 100644
--- a/distribution/src/main/assembly/bin.xml
+++ b/distribution/src/main/assembly/bin.xml
@@ -65,7 +65,20 @@
       <outputDirectory/>
     </fileSet>
     <fileSet>
-      <directory>${project.basedir}/../mrlegacy/target</directory>
+      <directory>${project.basedir}/../hdfs/target</directory>
+      <includes>
+        <include>mahout-*.job</include>
+        <include>mahout-*.jar</include>
+      </includes>
+      <excludes>
+        <exclude>*sources.jar</exclude>
+        <exclude>*javadoc.jar</exclude>
+        <exclude>*tests.jar</exclude>
+      </excludes>
+      <outputDirectory/>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../mr/target</directory>
       <includes>
         <include>mahout-*.job</include>
         <include>mahout-*.jar</include>
@@ -112,8 +125,12 @@
       <outputDirectory>docs/mahout-math</outputDirectory>
     </fileSet>
     <fileSet>
-      <directory>${project.basedir}/../mrlegacy/target/apidocs</directory>
-      <outputDirectory>docs/mahout-mrlegacy</outputDirectory>
+      <directory>${project.basedir}/../hdfs/target/apidocs</directory>
+      <outputDirectory>docs/mahout-hdfs</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../mr/target/apidocs</directory>
+      <outputDirectory>docs/mahout-mr</outputDirectory>
     </fileSet>
     <fileSet>
       <directory>${project.basedir}/../integration/target/apidocs</directory>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index dbf4bd5..b710388 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -108,11 +108,21 @@
     <!-- our modules -->
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-hdfs</artifactId>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-mr</artifactId>
+    </dependency>
+   <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-mr</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/examples/src/main/java/org/apache/mahout/classifier/df/mapreduce/TestForest.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/mahout/classifier/df/mapreduce/TestForest.java
 
b/examples/src/main/java/org/apache/mahout/classifier/df/mapreduce/TestForest.java
index 9ce2104..411c68f 100644
--- 
a/examples/src/main/java/org/apache/mahout/classifier/df/mapreduce/TestForest.java
+++ 
b/examples/src/main/java/org/apache/mahout/classifier/df/mapreduce/TestForest.java
@@ -227,7 +227,7 @@ public class TestForest extends Configured implements Tool {
     Random rng = RandomUtils.getRandom();
 
     List<double[]> resList = Lists.newArrayList();
-    if (dataFS.getFileStatus(dataPath).isDirectory()) {
+    if (dataFS.getFileStatus(dataPath).isDir()) {
       //the input is a directory of files
       testDirectory(outputPath, converter, forest, dataset, resList, rng);
     }  else {

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/h2o/pom.xml
----------------------------------------------------------------------
diff --git a/h2o/pom.xml b/h2o/pom.xml
index 9dc4e62..92beeca 100644
--- a/h2o/pom.xml
+++ b/h2o/pom.xml
@@ -132,10 +132,10 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- for MatrixWritable and VectorWritable -->
     <dependency>
-      <!-- for MatrixWritable and VectorWritable -->
       <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-hdfs</artifactId>
       <version>${project.version}</version>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hdfs/pom.xml b/hdfs/pom.xml
new file mode 100644
index 0000000..7e77162
--- /dev/null
+++ b/hdfs/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.mahout</groupId>
+    <artifactId>mahout</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <!-- modules inherit parent's group id and version. -->
+  <artifactId>mahout-hdfs</artifactId>
+  <name>Mahout HDFS</name>
+  <description>Scalable machine learning libraries</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+      <resource>
+        <directory>../src/conf</directory>
+        <includes>
+          <include>driver.classes.default.props</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <!-- create test jar so other modules can reuse the core test utility 
classes. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-remote-resources-plugin</artifactId>
+        <configuration>
+          
<appendedResourcesDirectory>../src/main/appended-resources</appendedResourcesDirectory>
+          <resourceBundles>
+            
<resourceBundle>org.apache:apache-jar-resource-bundle:1.4</resourceBundle>
+          </resourceBundles>
+          <supplementalModels>
+            <supplementalModel>supplemental-models.xml</supplementalModel>
+          </supplementalModels>
+        </configuration>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+
+    <!-- our modules -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-math</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-math</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Third Party -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jcl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mahout.commons</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.carrotsearch.randomizedtesting</groupId>
+      <artifactId>randomizedtesting-runner</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mrunit</groupId>
+      <artifactId>mrunit</artifactId>
+      <version>1.0.0</version>
+      <classifier>${hadoop.classifier}</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <version>3.0.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-commons-csv</artifactId>
+      <version>3.5.0</version>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/common/IOUtils.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/common/IOUtils.java 
b/hdfs/src/main/java/org/apache/mahout/common/IOUtils.java
new file mode 100644
index 0000000..0372ed4
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/common/IOUtils.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * I/O-related utility methods that don't have a better home.
+ * </p>
+ */
+public final class IOUtils {
+  
+  private static final Logger log = LoggerFactory.getLogger(IOUtils.class);
+  
+  private IOUtils() { }
+
+  // Sheez, why can't ResultSet, Statement and Connection implement Closeable?
+  
+  public static void quietClose(ResultSet closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (SQLException sqle) {
+        log.warn("Unexpected exception while closing; continuing", sqle);
+      }
+    }
+  }
+  
+  public static void quietClose(Statement closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (SQLException sqle) {
+        log.warn("Unexpected exception while closing; continuing", sqle);
+      }
+    }
+  }
+  
+  public static void quietClose(Connection closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (SQLException sqle) {
+        log.warn("Unexpected exception while closing; continuing", sqle);
+      }
+    }
+  }
+  
+  /**
+   * Closes a {@link ResultSet}, {@link Statement} and {@link Connection} (if 
not null) and logs (but does not
+   * rethrow) any resulting {@link SQLException}. This is useful for cleaning 
up after a database query.
+   * 
+   * @param resultSet
+   *          {@link ResultSet} to close
+   * @param statement
+   *          {@link Statement} to close
+   * @param connection
+   *          {@link Connection} to close
+   */
+  public static void quietClose(ResultSet resultSet, Statement statement, 
Connection connection) {
+    quietClose(resultSet);
+    quietClose(statement);
+    quietClose(connection);
+  }
+  
+  /**
+   * make sure to close all sources, log all of the problems occurred, clear
+   * {@code closeables} (to prevent repeating close attempts), re-throw the
+   * last one at the end. Helps resource scope management (e.g. compositions of
+   * {@link Closeable}s objects)
+   * <P>
+   * <p/>
+   * Typical pattern:
+   * <p/>
+   *
+   * <pre>
+   *   LinkedList<Closeable> closeables = new LinkedList<Closeable>();
+   *   try {
+   *      InputStream stream1 = new FileInputStream(...);
+   *      closeables.addFirst(stream1);
+   *      ...
+   *      InputStream streamN = new FileInputStream(...);
+   *      closeables.addFirst(streamN);
+   *      ...
+   *   } finally {
+   *      IOUtils.close(closeables);
+   *   }
+   * </pre>
+   *
+   * @param closeables
+   *          must be a modifiable collection of {@link Closeable}s
+   * @throws IOException
+   *           the last exception (if any) of all closed resources
+   */
+  public static void close(Collection<? extends Closeable> closeables)
+    throws IOException {
+    Throwable lastThr = null;
+
+    for (Closeable closeable : closeables) {
+      try {
+        closeable.close();
+      } catch (Throwable thr) {
+        log.error(thr.getMessage(), thr);
+        lastThr = thr;
+      }
+    }
+
+    // make sure we don't double-close
+    // but that has to be modifiable collection
+    closeables.clear();
+
+    if (lastThr != null) {
+      if (lastThr instanceof IOException) {
+        throw (IOException) lastThr;
+      } else if (lastThr instanceof RuntimeException) {
+        throw (RuntimeException) lastThr;
+      } else {
+        throw (Error) lastThr;
+      }
+    }
+
+  }
+
+
+  /**
+   * for temporary files, a file may be considered as a {@link Closeable} too,
+   * where file is wiped on close and thus the disk resource is released
+   * ('closed').
+   * 
+   * 
+   */
+  public static class DeleteFileOnClose implements Closeable {
+
+    private final File file;
+
+    public DeleteFileOnClose(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (file.isFile()) {
+        file.delete();
+      }
+    }
+  }
+
+  /**
+   * MultipleOutputs to closeable adapter.
+   * 
+   */
+  public static class MultipleOutputsCloseableAdapter implements Closeable {
+    private final MultipleOutputs mo;
+
+    public MultipleOutputsCloseableAdapter(MultipleOutputs mo) {
+      this.mo = mo;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (mo != null) {
+        mo.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/math/MatrixWritable.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/math/MatrixWritable.java 
b/hdfs/src/main/java/org/apache/mahout/math/MatrixWritable.java
new file mode 100644
index 0000000..c521f3e
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/math/MatrixWritable.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.list.IntArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class MatrixWritable implements Writable {
+
+  private static final int FLAG_DENSE = 0x01;
+  private static final int FLAG_SEQUENTIAL = 0x02;
+  private static final int FLAG_LABELS = 0x04;
+  private static final int FLAG_SPARSE_ROW = 0x08;
+  private static final int NUM_FLAGS = 4;
+
+  private Matrix matrix;
+
+  public MatrixWritable() {}
+
+  public MatrixWritable(Matrix m) {
+    this.matrix = m;
+  }
+
+  public Matrix get() {
+    return matrix;
+  }
+
+  public void set(Matrix matrix) {
+    this.matrix = matrix;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeMatrix(out, matrix);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    matrix = readMatrix(in);
+  }
+
+  public static void readLabels(DataInput in,
+                                Map<String, Integer> columnLabelBindings,
+                                Map<String, Integer> rowLabelBindings) throws 
IOException {
+    int colSize = in.readInt();
+    if (colSize > 0) {
+      for (int i = 0; i < colSize; i++) {
+        columnLabelBindings.put(in.readUTF(), in.readInt());
+      }
+    }
+    int rowSize = in.readInt();
+    if (rowSize > 0) {
+      for (int i = 0; i < rowSize; i++) {
+        rowLabelBindings.put(in.readUTF(), in.readInt());
+      }
+    }
+  }
+
+  public static void writeLabelBindings(DataOutput out,
+                                        Map<String, Integer> 
columnLabelBindings,
+                                        Map<String, Integer> rowLabelBindings) 
throws IOException {
+    if (columnLabelBindings == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(columnLabelBindings.size());
+      for (Map.Entry<String, Integer> stringIntegerEntry : 
columnLabelBindings.entrySet()) {
+        out.writeUTF(stringIntegerEntry.getKey());
+        out.writeInt(stringIntegerEntry.getValue());
+      }
+    }
+    if (rowLabelBindings == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(rowLabelBindings.size());
+      for (Map.Entry<String, Integer> stringIntegerEntry : 
rowLabelBindings.entrySet()) {
+        out.writeUTF(stringIntegerEntry.getKey());
+        out.writeInt(stringIntegerEntry.getValue());
+      }
+    }
+  }
+
+  /** Reads a typed Matrix instance from the input stream */
+  public static Matrix readMatrix(DataInput in) throws IOException {
+    int flags = in.readInt();
+    Preconditions.checkArgument(flags >> NUM_FLAGS == 0, "Unknown flags set: 
%d", Integer.toString(flags, 2));
+    boolean dense = (flags & FLAG_DENSE) != 0;
+    boolean sequential = (flags & FLAG_SEQUENTIAL) != 0;
+    boolean hasLabels = (flags & FLAG_LABELS) != 0;
+    boolean isSparseRowMatrix = (flags & FLAG_SPARSE_ROW) != 0;
+
+    int rows = in.readInt();
+    int columns = in.readInt();
+
+    byte vectorFlags = in.readByte();
+
+    Matrix matrix;
+
+    if (dense) {
+      matrix = new DenseMatrix(rows, columns);
+      for (int row = 0; row < rows; row++) {
+        matrix.assignRow(row, VectorWritable.readVector(in, vectorFlags, 
columns));
+      }
+    } else if (isSparseRowMatrix) {
+      Vector[] rowVectors = new Vector[rows];
+      for (int row = 0; row < rows; row++) {
+        rowVectors[row] = VectorWritable.readVector(in, vectorFlags, columns);
+      }
+      matrix = new SparseRowMatrix(rows, columns, rowVectors, true, 
!sequential);
+    } else {
+      matrix = new SparseMatrix(rows, columns);
+      int numNonZeroRows = in.readInt();
+      int rowsRead = 0;
+      while (rowsRead++ < numNonZeroRows) {
+        int rowIndex = in.readInt();
+        matrix.assignRow(rowIndex, VectorWritable.readVector(in, vectorFlags, 
columns));
+      }
+    }
+
+    if (hasLabels) {
+      Map<String,Integer> columnLabelBindings = Maps.newHashMap();
+      Map<String,Integer> rowLabelBindings = Maps.newHashMap();
+      readLabels(in, columnLabelBindings, rowLabelBindings);
+      if (!columnLabelBindings.isEmpty()) {
+        matrix.setColumnLabelBindings(columnLabelBindings);
+      }
+      if (!rowLabelBindings.isEmpty()) {
+        matrix.setRowLabelBindings(rowLabelBindings);
+      }
+    }
+
+    return matrix;
+  }
+
+  /** Writes a typed Matrix instance to the output stream */
+  public static void writeMatrix(final DataOutput out, Matrix matrix) throws 
IOException {
+    int flags = 0;
+    Vector row = matrix.viewRow(0);
+    boolean isDense = row.isDense();
+    if (isDense) {
+      flags |= FLAG_DENSE;
+    }
+    if (row.isSequentialAccess()) {
+      flags |= FLAG_SEQUENTIAL;
+    }
+    if (matrix.getRowLabelBindings() != null || 
matrix.getColumnLabelBindings() != null) {
+      flags |= FLAG_LABELS;
+    }
+    boolean isSparseRowMatrix = matrix instanceof SparseRowMatrix;
+    if (isSparseRowMatrix) {
+      flags |= FLAG_SPARSE_ROW;
+    }
+
+    out.writeInt(flags);
+    out.writeInt(matrix.rowSize());
+    out.writeInt(matrix.columnSize());
+
+    // We only use vectors of the same type, so we write out the type 
information only once!
+    byte vectorFlags = VectorWritable.flags(matrix.viewRow(0), false);
+    out.writeByte(vectorFlags);
+
+    if (isDense || isSparseRowMatrix) {
+      for (int i = 0; i < matrix.rowSize(); i++) {
+        VectorWritable.writeVectorContents(out, matrix.viewRow(i), 
vectorFlags);
+      }
+    } else {
+      IntArrayList rowIndices = ((SparseMatrix) matrix).nonZeroRowIndices();
+      int numNonZeroRows = rowIndices.size();
+      out.writeInt(numNonZeroRows);
+      for (int i = 0; i < numNonZeroRows; i++) {
+        int rowIndex = rowIndices.getQuick(i);
+        out.writeInt(rowIndex);
+        VectorWritable.writeVectorContents(out, matrix.viewRow(rowIndex), 
vectorFlags);
+      }
+    }
+
+    if ((flags & FLAG_LABELS) != 0) {
+      writeLabelBindings(out, matrix.getColumnLabelBindings(), 
matrix.getRowLabelBindings());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/math/VarIntWritable.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/math/VarIntWritable.java 
b/hdfs/src/main/java/org/apache/mahout/math/VarIntWritable.java
new file mode 100644
index 0000000..e5cb173
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/math/VarIntWritable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class VarIntWritable implements WritableComparable<VarIntWritable>, 
Cloneable {
+
+  private int value;
+
+  public VarIntWritable() {
+  }
+
+  public VarIntWritable(int value) {
+    this.value = value;
+  }
+
+  public int get() {
+    return value;
+  }
+
+  public void set(int value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return other instanceof VarIntWritable && ((VarIntWritable) other).value 
== value;
+  }
+
+  @Override
+  public int hashCode() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(value);
+  }
+
+  @Override
+  public VarIntWritable clone() {
+    return new VarIntWritable(value);
+  }
+
+  @Override
+  public int compareTo(VarIntWritable other) {
+    if (value < other.value) {
+      return -1;
+    }
+    if (value > other.value) {
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarInt(value, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = Varint.readSignedVarInt(in);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/math/VarLongWritable.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/math/VarLongWritable.java 
b/hdfs/src/main/java/org/apache/mahout/math/VarLongWritable.java
new file mode 100644
index 0000000..7b0d9c4
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/math/VarLongWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.io.WritableComparable;
+
+public class VarLongWritable implements WritableComparable<VarLongWritable> {
+
+  private long value;
+
+  public VarLongWritable() {
+  }
+
+  public VarLongWritable(long value) {
+    this.value = value;
+  }
+
+  public long get() {
+    return value;
+  }
+
+  public void set(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return other != null && getClass().equals(other.getClass()) && 
((VarLongWritable) other).value == value;
+  }
+
+  @Override
+  public int hashCode() {
+    return Longs.hashCode(value);
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(value);
+  }
+
+  @Override
+  public int compareTo(VarLongWritable other) {
+    if (value >= other.value) {
+      if (value > other.value) {
+        return 1;
+      }
+    } else {
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeSignedVarLong(value, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = Varint.readSignedVarLong(in);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/math/Varint.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/math/Varint.java 
b/hdfs/src/main/java/org/apache/mahout/math/Varint.java
new file mode 100644
index 0000000..f380c6c
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/math/Varint.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>Encodes signed and unsigned values using a common variable-length
+ * scheme, found for example in
+ * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html";>
+ * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
+ * but will use slightly more bytes to encode large values.</p>
+ *
+ * <p>Signed values are further encoded using so-called zig-zag encoding
+ * in order to make them "compatible" with variable-length encoding.</p>
+ */
+public final class Varint {
+
+  private Varint() {
+  }
+
+  /**
+   * Encodes a value using the variable-length encoding from
+   * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html";>
+   * Google Protocol Buffers</a>. It uses zig-zag encoding to efficiently
+   * encode signed values. If values are known to be nonnegative,
+   * {@link #writeUnsignedVarLong(long, java.io.DataOutput)} should be used.
+   *
+   * @param value value to encode
+   * @param out to write bytes to
+   * @throws java.io.IOException if {@link java.io.DataOutput} throws {@link 
java.io.IOException}
+   */
+  public static void writeSignedVarLong(long value, DataOutput out) throws 
IOException {
+    // Great trick from 
http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+    writeUnsignedVarLong((value << 1) ^ (value >> 63), out);
+  }
+
+  /**
+   * Encodes a value using the variable-length encoding from
+   * <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html";>
+   * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be 
negative.
+   * If values can be negative, use {@link #writeSignedVarLong(long, 
java.io.DataOutput)}
+   * instead. This method treats negative input as like a large unsigned value.
+   *
+   * @param value value to encode
+   * @param out to write bytes to
+   * @throws java.io.IOException if {@link java.io.DataOutput} throws {@link 
java.io.IOException}
+   */
+  public static void writeUnsignedVarLong(long value, DataOutput out) throws 
IOException {
+    while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+      out.writeByte(((int) value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    out.writeByte((int) value & 0x7F);
+  }
+
+  /**
+   * @see #writeSignedVarLong(long, java.io.DataOutput)
+   */
+  public static void writeSignedVarInt(int value, DataOutput out) throws 
IOException {
+    // Great trick from 
http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
+    writeUnsignedVarInt((value << 1) ^ (value >> 31), out);
+  }
+
+  /**
+   * @see #writeUnsignedVarLong(long, java.io.DataOutput)
+   */
+  public static void writeUnsignedVarInt(int value, DataOutput out) throws 
IOException {
+    while ((value & 0xFFFFFF80) != 0L) {
+      out.writeByte((value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    out.writeByte(value & 0x7F);
+  }
+
+  /**
+   * @param in to read bytes from
+   * @return decode value
+   * @throws java.io.IOException if {@link java.io.DataInput} throws {@link 
java.io.IOException}
+   * @throws IllegalArgumentException if variable-length value does not 
terminate
+   *  after 9 bytes have been read
+   * @see #writeSignedVarLong(long, java.io.DataOutput)
+   */
+  public static long readSignedVarLong(DataInput in) throws IOException {
+    long raw = readUnsignedVarLong(in);
+    // This undoes the trick in writeSignedVarLong()
+    long temp = (((raw << 63) >> 63) ^ raw) >> 1;
+    // This extra step lets us deal with the largest signed values by treating
+    // negative results from read unsigned methods as like unsigned values
+    // Must re-flip the top bit if the original read value had it set.
+    return temp ^ (raw & (1L << 63));
+  }
+
+  /**
+   * @param in to read bytes from
+   * @return decode value
+   * @throws java.io.IOException if {@link java.io.DataInput} throws {@link 
java.io.IOException}
+   * @throws IllegalArgumentException if variable-length value does not 
terminate
+   *  after 9 bytes have been read
+   * @see #writeUnsignedVarLong(long, java.io.DataOutput)
+   */
+  public static long readUnsignedVarLong(DataInput in) throws IOException {
+    long value = 0L;
+    int i = 0;
+    long b;
+    while (((b = in.readByte()) & 0x80L) != 0) {
+      value |= (b & 0x7F) << i;
+      i += 7;
+      Preconditions.checkArgument(i <= 63, "Variable length quantity is too 
long (must be <= 63)");
+    }
+    return value | (b << i);
+  }
+
+  /**
+   * @throws IllegalArgumentException if variable-length value does not 
terminate
+   *  after 5 bytes have been read
+   * @throws java.io.IOException if {@link java.io.DataInput} throws {@link 
java.io.IOException}
+   * @see #readSignedVarLong(java.io.DataInput)
+   */
+  public static int readSignedVarInt(DataInput in) throws IOException {
+    int raw = readUnsignedVarInt(in);
+    // This undoes the trick in writeSignedVarInt()
+    int temp = (((raw << 31) >> 31) ^ raw) >> 1;
+    // This extra step lets us deal with the largest signed values by treating
+    // negative results from read unsigned methods as like unsigned values.
+    // Must re-flip the top bit if the original read value had it set.
+    return temp ^ (raw & (1 << 31));
+  }
+
+  /**
+   * @throws IllegalArgumentException if variable-length value does not 
terminate
+   *  after 5 bytes have been read
+   * @throws java.io.IOException if {@link java.io.DataInput} throws {@link 
java.io.IOException}
+   * @see #readUnsignedVarLong(java.io.DataInput)
+   */
+  public static int readUnsignedVarInt(DataInput in) throws IOException {
+    int value = 0;
+    int i = 0;
+    int b;
+    while (((b = in.readByte()) & 0x80) != 0) {
+      value |= (b & 0x7F) << i;
+      i += 7;
+      Preconditions.checkArgument(i <= 35, "Variable length quantity is too 
long (must be <= 35)");
+    }
+    return value | (b << i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/main/java/org/apache/mahout/math/VectorWritable.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/mahout/math/VectorWritable.java 
b/hdfs/src/main/java/org/apache/mahout/math/VectorWritable.java
new file mode 100644
index 0000000..491ae3b
--- /dev/null
+++ b/hdfs/src/main/java/org/apache/mahout/math/VectorWritable.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Vector.Element;
+
+import com.google.common.base.Preconditions;
+
+public final class VectorWritable extends Configured implements Writable {
+
+  public static final int FLAG_DENSE = 0x01;
+  public static final int FLAG_SEQUENTIAL = 0x02;
+  public static final int FLAG_NAMED = 0x04;
+  public static final int FLAG_LAX_PRECISION = 0x08;
+  public static final int NUM_FLAGS = 4;
+
+  private Vector vector;
+  private boolean writesLaxPrecision;
+
+  public VectorWritable() {}
+
+  public VectorWritable(boolean writesLaxPrecision) {
+    setWritesLaxPrecision(writesLaxPrecision);
+  }
+
+  public VectorWritable(Vector vector) {
+    this.vector = vector;
+  }
+
+  public VectorWritable(Vector vector, boolean writesLaxPrecision) {
+    this(vector);
+    setWritesLaxPrecision(writesLaxPrecision);
+  }
+
+  /**
+   * @return {@link org.apache.mahout.math.Vector} that this is to write, or 
has
+   *  just read
+   */
+  public Vector get() {
+    return vector;
+  }
+
+  public void set(Vector vector) {
+    this.vector = vector;
+  }
+
+  /**
+   * @return true if this is allowed to encode {@link 
org.apache.mahout.math.Vector}
+   *  values using fewer bytes, possibly losing precision. In particular this 
means
+   *  that floating point values will be encoded as floats, not doubles.
+   */
+  public boolean isWritesLaxPrecision() {
+    return writesLaxPrecision;
+  }
+
+  public void setWritesLaxPrecision(boolean writesLaxPrecision) {
+    this.writesLaxPrecision = writesLaxPrecision;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeVector(out, this.vector, this.writesLaxPrecision);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int flags = in.readByte();
+    int size = Varint.readUnsignedVarInt(in);
+    readFields(in, (byte) flags, size);
+  }
+
+  private void readFields(DataInput in, byte flags, int size) throws 
IOException {
+
+    Preconditions.checkArgument(flags >> NUM_FLAGS == 0, "Unknown flags set: 
%d", Integer.toString(flags, 2));
+    boolean dense = (flags & FLAG_DENSE) != 0;
+    boolean sequential = (flags & FLAG_SEQUENTIAL) != 0;
+    boolean named = (flags & FLAG_NAMED) != 0;
+    boolean laxPrecision = (flags & FLAG_LAX_PRECISION) != 0;
+
+    Vector v;
+    if (dense) {
+      double[] values = new double[size];
+      for (int i = 0; i < size; i++) {
+        values[i] = laxPrecision ? in.readFloat() : in.readDouble();
+      }
+      v = new DenseVector(values);
+    } else {
+      int numNonDefaultElements = Varint.readUnsignedVarInt(in);
+      v = sequential
+          ? new SequentialAccessSparseVector(size, numNonDefaultElements)
+          : new RandomAccessSparseVector(size, numNonDefaultElements);
+      if (sequential) {
+        int lastIndex = 0;
+        for (int i = 0; i < numNonDefaultElements; i++) {
+          int delta = Varint.readUnsignedVarInt(in);
+          int index = lastIndex + delta;
+          lastIndex = index;
+          double value = laxPrecision ? in.readFloat() : in.readDouble();
+          v.setQuick(index, value);
+        }
+      } else {
+        for (int i = 0; i < numNonDefaultElements; i++) {
+          int index = Varint.readUnsignedVarInt(in);
+          double value = laxPrecision ? in.readFloat() : in.readDouble();
+          v.setQuick(index, value);
+        }
+      }
+    }
+    if (named) {
+      String name = in.readUTF();
+      v = new NamedVector(v, name);
+    }
+    vector = v;
+  }
+
+  /** Write the vector to the output */
+  public static void writeVector(DataOutput out, Vector vector) throws 
IOException {
+    writeVector(out, vector, false);
+  }
+
+  public static byte flags(Vector vector, boolean laxPrecision) {
+    boolean dense = vector.isDense();
+    boolean sequential = vector.isSequentialAccess();
+    boolean named = vector instanceof NamedVector;
+
+    return (byte) ((dense ? FLAG_DENSE : 0)
+            | (sequential ? FLAG_SEQUENTIAL : 0)
+            | (named ? FLAG_NAMED : 0)
+            | (laxPrecision ? FLAG_LAX_PRECISION : 0));
+  }
+
+  /** Write out type information and size of the vector */
+  public static void writeVectorFlagsAndSize(DataOutput out, byte flags, int 
size) throws IOException {
+    out.writeByte(flags);
+    Varint.writeUnsignedVarInt(size, out);
+  }
+
+  public static void writeVector(DataOutput out, Vector vector, boolean 
laxPrecision) throws IOException {
+
+    byte flags = flags(vector, laxPrecision);
+
+    writeVectorFlagsAndSize(out, flags, vector.size());
+    writeVectorContents(out, vector, flags);
+  }
+
+  /** Write out contents of the vector */
+  public static void writeVectorContents(DataOutput out, Vector vector, byte 
flags) throws IOException {
+
+    boolean dense = (flags & FLAG_DENSE) != 0;
+    boolean sequential = (flags & FLAG_SEQUENTIAL) != 0;
+    boolean named = (flags & FLAG_NAMED) != 0;
+    boolean laxPrecision = (flags & FLAG_LAX_PRECISION) != 0;
+
+    if (dense) {
+      for (Element element : vector.all()) {
+        if (laxPrecision) {
+          out.writeFloat((float) element.get());
+        } else {
+          out.writeDouble(element.get());
+        }
+      }
+    } else {
+      Varint.writeUnsignedVarInt(vector.getNumNonZeroElements(), out);
+      Iterator<Element> iter = vector.nonZeroes().iterator();
+      if (sequential) {
+        int lastIndex = 0;
+        while (iter.hasNext()) {
+          Element element = iter.next();
+          if (element.get() == 0) {
+            continue;
+          }
+          int thisIndex = element.index();
+          // Delta-code indices:
+          Varint.writeUnsignedVarInt(thisIndex - lastIndex, out);
+          lastIndex = thisIndex;
+          if (laxPrecision) {
+            out.writeFloat((float) element.get());
+          } else {
+            out.writeDouble(element.get());
+          }
+        }
+      } else {
+        while (iter.hasNext()) {
+          Element element = iter.next();
+          if (element.get() == 0) {
+            // TODO(robinanil): Fix the damn iterator for the zero element.
+            continue;
+          }
+          Varint.writeUnsignedVarInt(element.index(), out);
+          if (laxPrecision) {
+            out.writeFloat((float) element.get());
+          } else {
+            out.writeDouble(element.get());
+          }
+        }
+      }
+    }
+    if (named) {
+      String name = ((NamedVector) vector).getName();
+      out.writeUTF(name == null ? "" : name);
+    }
+  }
+
+  public static Vector readVector(DataInput in) throws IOException {
+    VectorWritable v = new VectorWritable();
+    v.readFields(in);
+    return v.get();
+  }
+
+  public static Vector readVector(DataInput in, byte vectorFlags, int size) 
throws IOException {
+    VectorWritable v = new VectorWritable();
+    v.readFields(in, vectorFlags, size);
+    return v.get();
+  }
+
+  public static VectorWritable merge(Iterator<VectorWritable> vectors) {
+    return new VectorWritable(mergeToVector(vectors));
+  }
+
+  public static Vector mergeToVector(Iterator<VectorWritable> vectors) {
+    Vector accumulator = vectors.next().get();
+    while (vectors.hasNext()) {
+      VectorWritable v = vectors.next();
+      if (v != null) {
+        for (Element nonZeroElement : v.get().nonZeroes()) {
+          accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
+        }
+      }
+    }
+    return accumulator;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof VectorWritable && vector.equals(((VectorWritable) 
o).get());
+  }
+
+  @Override
+  public int hashCode() {
+    return vector.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return vector.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/test/java/org/apache/mahout/math/MatrixWritableTest.java
----------------------------------------------------------------------
diff --git a/hdfs/src/test/java/org/apache/mahout/math/MatrixWritableTest.java 
b/hdfs/src/test/java/org/apache/mahout/math/MatrixWritableTest.java
new file mode 100644
index 0000000..226d4b1
--- /dev/null
+++ b/hdfs/src/test/java/org/apache/mahout/math/MatrixWritableTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+public final class MatrixWritableTest extends MahoutTestCase {
+
+  @Test
+  public void testSparseMatrixWritable() throws Exception {
+    Matrix m = new SparseMatrix(5, 5);
+    m.set(1, 2, 3.0);
+    m.set(3, 4, 5.0);
+    Map<String, Integer> bindings = Maps.newHashMap();
+    bindings.put("A", 0);
+    bindings.put("B", 1);
+    bindings.put("C", 2);
+    bindings.put("D", 3);
+    bindings.put("default", 4);
+    m.setRowLabelBindings(bindings);
+    m.setColumnLabelBindings(bindings);
+    doTestMatrixWritableEquals(m);
+  }
+
+  @Test
+  public void testSparseRowMatrixWritable() throws Exception {
+    Matrix m = new SparseRowMatrix(5, 5);
+    m.set(1, 2, 3.0);
+    m.set(3, 4, 5.0);
+    Map<String, Integer> bindings = Maps.newHashMap();
+    bindings.put("A", 0);
+    bindings.put("B", 1);
+    bindings.put("C", 2);
+    bindings.put("D", 3);
+    bindings.put("default", 4);
+    m.setRowLabelBindings(bindings);
+    m.setColumnLabelBindings(bindings);
+    doTestMatrixWritableEquals(m);
+  }
+
+  @Test
+  public void testDenseMatrixWritable() throws Exception {
+    Matrix m = new DenseMatrix(5,5);
+    m.set(1, 2, 3.0);
+    m.set(3, 4, 5.0);
+    Map<String, Integer> bindings = Maps.newHashMap();
+    bindings.put("A", 0);
+    bindings.put("B", 1);
+    bindings.put("C", 2);
+    bindings.put("D", 3);
+    bindings.put("default", 4);
+    m.setRowLabelBindings(bindings);
+    m.setColumnLabelBindings(bindings);
+    doTestMatrixWritableEquals(m);
+  }
+
+  private static void doTestMatrixWritableEquals(Matrix m) throws IOException {
+    Writable matrixWritable = new MatrixWritable(m);
+    MatrixWritable matrixWritable2 = new MatrixWritable();
+    writeAndRead(matrixWritable, matrixWritable2);
+    Matrix m2 = matrixWritable2.get();
+    compareMatrices(m, m2); 
+    doCheckBindings(m2.getRowLabelBindings());
+    doCheckBindings(m2.getColumnLabelBindings());    
+  }
+
+  private static void compareMatrices(Matrix m, Matrix m2) {
+    assertEquals(m.numRows(), m2.numRows());
+    assertEquals(m.numCols(), m2.numCols());
+    for (int r = 0; r < m.numRows(); r++) {
+      for (int c = 0; c < m.numCols(); c++) {
+        assertEquals(m.get(r, c), m2.get(r, c), EPSILON);
+      }
+    }
+    Map<String,Integer> bindings = m.getRowLabelBindings();
+    Map<String, Integer> bindings2 = m2.getRowLabelBindings();
+    assertEquals(bindings == null, bindings2 == null);
+    if (bindings != null) {
+      assertEquals(bindings.size(), m.numRows());
+      assertEquals(bindings.size(), bindings2.size());
+      for (Map.Entry<String,Integer> entry : bindings.entrySet()) {
+        assertEquals(entry.getValue(), bindings2.get(entry.getKey()));
+      }
+    }
+    bindings = m.getColumnLabelBindings();
+    bindings2 = m2.getColumnLabelBindings();
+    assertEquals(bindings == null, bindings2 == null);
+    if (bindings != null) {
+      assertEquals(bindings.size(), bindings2.size());
+      for (Map.Entry<String,Integer> entry : bindings.entrySet()) {
+        assertEquals(entry.getValue(), bindings2.get(entry.getKey()));
+      }
+    }
+  }
+
+  private static void doCheckBindings(Map<String,Integer> labels) {
+    assertTrue("Missing label", labels.keySet().contains("A"));
+    assertTrue("Missing label", labels.keySet().contains("B"));
+    assertTrue("Missing label", labels.keySet().contains("C"));
+    assertTrue("Missing label", labels.keySet().contains("D"));
+    assertTrue("Missing label", labels.keySet().contains("default"));
+  }
+
+  private static void writeAndRead(Writable toWrite, Writable toRead) throws 
IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    try {
+      toWrite.write(dos);
+    } finally {
+      Closeables.close(dos, false);
+    }
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    try {
+      toRead.readFields(dis);
+    } finally {
+      Closeables.close(dis, true);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/test/java/org/apache/mahout/math/VarintTest.java
----------------------------------------------------------------------
diff --git a/hdfs/src/test/java/org/apache/mahout/math/VarintTest.java 
b/hdfs/src/test/java/org/apache/mahout/math/VarintTest.java
new file mode 100644
index 0000000..0b1a664
--- /dev/null
+++ b/hdfs/src/test/java/org/apache/mahout/math/VarintTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+
+/**
+ * Tests {@link Varint}.
+ */
+public final class VarintTest extends MahoutTestCase {
+
+  @Test
+  public void testUnsignedLong() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    Varint.writeUnsignedVarLong(0L, out);
+    for (long i = 1L; i > 0L && i <= (1L << 62); i <<= 1) {
+      Varint.writeUnsignedVarLong(i-1, out);
+      Varint.writeUnsignedVarLong(i, out);
+    }
+    Varint.writeUnsignedVarLong(Long.MAX_VALUE, out);
+
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    assertEquals(0L, Varint.readUnsignedVarLong(in));
+    for (long i = 1L; i > 0L && i <= (1L << 62); i <<= 1) {
+      assertEquals(i-1, Varint.readUnsignedVarLong(in));
+      assertEquals(i, Varint.readUnsignedVarLong(in));
+    }
+    assertEquals(Long.MAX_VALUE, Varint.readUnsignedVarLong(in));
+  }
+
+  @Test
+  public void testSignedPositiveLong() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    Varint.writeSignedVarLong(0L, out);
+    for (long i = 1L; i <= (1L << 61); i <<= 1) {
+      Varint.writeSignedVarLong(i-1, out);
+      Varint.writeSignedVarLong(i, out);
+    }
+    Varint.writeSignedVarLong((1L << 62) - 1, out);
+    Varint.writeSignedVarLong((1L << 62), out);
+    Varint.writeSignedVarLong(Long.MAX_VALUE, out);
+
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    assertEquals(0L, Varint.readSignedVarLong(in));
+    for (long i = 1L; i <= (1L << 61); i <<= 1) {
+      assertEquals(i-1, Varint.readSignedVarLong(in));
+      assertEquals(i, Varint.readSignedVarLong(in));
+    }
+    assertEquals((1L << 62) - 1, Varint.readSignedVarLong(in));
+    assertEquals((1L << 62), Varint.readSignedVarLong(in));
+    assertEquals(Long.MAX_VALUE, Varint.readSignedVarLong(in));
+  }
+
+  @Test
+  public void testSignedNegativeLong() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    for (long i = -1L; i >= -(1L << 62); i <<= 1) {
+      Varint.writeSignedVarLong(i, out);
+      Varint.writeSignedVarLong(i+1, out);
+    }
+    Varint.writeSignedVarLong(Long.MIN_VALUE, out);
+    Varint.writeSignedVarLong(Long.MIN_VALUE+1, out);
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    for (long i = -1L; i >= -(1L << 62); i <<= 1) {
+      assertEquals(i, Varint.readSignedVarLong(in));
+      assertEquals(i+1, Varint.readSignedVarLong(in));
+    }
+    assertEquals(Long.MIN_VALUE, Varint.readSignedVarLong(in));
+    assertEquals(Long.MIN_VALUE+1, Varint.readSignedVarLong(in));
+  }
+
+  @Test
+  public void testUnsignedInt() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    Varint.writeUnsignedVarInt(0, out);
+    for (int i = 1; i > 0 && i <= (1 << 30); i <<= 1) {
+      Varint.writeUnsignedVarLong(i-1, out);
+      Varint.writeUnsignedVarLong(i, out);
+    }
+    Varint.writeUnsignedVarLong(Integer.MAX_VALUE, out);
+
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    assertEquals(0, Varint.readUnsignedVarInt(in));
+    for (int i = 1; i > 0 && i <= (1 << 30); i <<= 1) {
+      assertEquals(i-1, Varint.readUnsignedVarInt(in));
+      assertEquals(i, Varint.readUnsignedVarInt(in));
+    }
+    assertEquals(Integer.MAX_VALUE, Varint.readUnsignedVarInt(in));
+  }
+
+  @Test
+  public void testSignedPositiveInt() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    Varint.writeSignedVarInt(0, out);
+    for (int i = 1; i <= (1 << 29); i <<= 1) {
+      Varint.writeSignedVarLong(i-1, out);
+      Varint.writeSignedVarLong(i, out);
+    }
+    Varint.writeSignedVarInt((1 << 30) - 1, out);
+    Varint.writeSignedVarInt((1 << 30), out);
+    Varint.writeSignedVarInt(Integer.MAX_VALUE, out);
+
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    assertEquals(0, Varint.readSignedVarInt(in));
+    for (int i = 1; i <= (1 << 29); i <<= 1) {
+      assertEquals(i-1, Varint.readSignedVarInt(in));
+      assertEquals(i, Varint.readSignedVarInt(in));
+    }
+    assertEquals((1L << 30) - 1, Varint.readSignedVarInt(in));
+    assertEquals((1L << 30), Varint.readSignedVarInt(in));
+    assertEquals(Integer.MAX_VALUE, Varint.readSignedVarInt(in));
+  }
+
+  @Test
+  public void testSignedNegativeInt() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    for (int i = -1; i >= -(1 << 30); i <<= 1) {
+      Varint.writeSignedVarInt(i, out);
+      Varint.writeSignedVarInt(i+1, out);
+    }
+    Varint.writeSignedVarInt(Integer.MIN_VALUE, out);
+    Varint.writeSignedVarInt(Integer.MIN_VALUE+1, out);
+    DataInput in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    for (int i = -1; i >= -(1 << 30); i <<= 1) {
+      assertEquals(i, Varint.readSignedVarInt(in));
+      assertEquals(i+1, Varint.readSignedVarInt(in));
+    }
+    assertEquals(Integer.MIN_VALUE, Varint.readSignedVarInt(in));
+    assertEquals(Integer.MIN_VALUE+1, Varint.readSignedVarInt(in));
+  }
+
+  @Test
+  public void testUnsignedSize() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    int expectedSize = 0;
+    for (int exponent = 0; exponent <= 62; exponent++) {
+      Varint.writeUnsignedVarLong(1L << exponent, out);
+      expectedSize += 1 + exponent / 7;
+      assertEquals(expectedSize, baos.size());
+    }
+  }
+
+  @Test
+  public void testSignedSize() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    int expectedSize = 0;
+    for (int exponent = 0; exponent <= 61; exponent++) {
+      Varint.writeSignedVarLong(1L << exponent, out);
+      expectedSize += 1 + ((exponent + 1) / 7);
+      assertEquals(expectedSize, baos.size());
+    }
+    for (int exponent = 0; exponent <= 61; exponent++) {
+      Varint.writeSignedVarLong(-(1L << exponent)-1, out);
+      expectedSize += 1 + ((exponent + 1) / 7);
+      assertEquals(expectedSize, baos.size());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/hdfs/src/test/java/org/apache/mahout/math/VectorWritableTest.java
----------------------------------------------------------------------
diff --git a/hdfs/src/test/java/org/apache/mahout/math/VectorWritableTest.java 
b/hdfs/src/test/java/org/apache/mahout/math/VectorWritableTest.java
new file mode 100644
index 0000000..60fb8b4
--- /dev/null
+++ b/hdfs/src/test/java/org/apache/mahout/math/VectorWritableTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.mahout.math;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Vector.Element;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.io.Closeables;
+
+public final class VectorWritableTest extends RandomizedTest {
+  private static final int MAX_VECTOR_SIZE = 100;
+
+  public void createRandom(Vector v) {
+    int size = randomInt(v.size() - 1);
+    for (int i = 0; i < size; ++i) {
+      v.set(randomInt(v.size() - 1), randomDouble());
+    }
+
+    int zeros = Math.max(2, size / 4);
+    for (Element e : v.nonZeroes()) {
+      if (e.index() % zeros == 0) {
+        e.set(0.0);
+      }
+    }
+  }
+
+  @Test
+  @Repeat(iterations = 20)
+  public void testViewSequentialAccessSparseVectorWritable() throws Exception {
+    Vector v = new SequentialAccessSparseVector(MAX_VECTOR_SIZE);
+    createRandom(v);
+    Vector view = new VectorView(v, 0, v.size());
+    doTestVectorWritableEquals(view);
+  }
+
+  @Test
+  @Repeat(iterations = 20)
+  public void testSequentialAccessSparseVectorWritable() throws Exception {
+    Vector v = new SequentialAccessSparseVector(MAX_VECTOR_SIZE);
+    createRandom(v);
+    doTestVectorWritableEquals(v);
+  }
+
+  @Test
+  @Repeat(iterations = 20)
+  public void testRandomAccessSparseVectorWritable() throws Exception {
+    Vector v = new RandomAccessSparseVector(MAX_VECTOR_SIZE);
+    createRandom(v);
+    doTestVectorWritableEquals(v);
+  }
+
+  @Test
+  @Repeat(iterations = 20)
+  public void testDenseVectorWritable() throws Exception {
+    Vector v = new DenseVector(MAX_VECTOR_SIZE);
+    createRandom(v);
+    doTestVectorWritableEquals(v);
+  }
+
+  @Test
+  @Repeat(iterations = 20)
+  public void testNamedVectorWritable() throws Exception {
+    Vector v = new DenseVector(MAX_VECTOR_SIZE);
+    v = new NamedVector(v, "Victor");
+    createRandom(v);
+    doTestVectorWritableEquals(v);
+  }
+
+  private static void doTestVectorWritableEquals(Vector v) throws IOException {
+    Writable vectorWritable = new VectorWritable(v);
+    VectorWritable vectorWritable2 = new VectorWritable();
+    writeAndRead(vectorWritable, vectorWritable2);
+    Vector v2 = vectorWritable2.get();
+    if (v instanceof NamedVector) {
+      assertTrue(v2 instanceof NamedVector);
+      NamedVector nv = (NamedVector) v;
+      NamedVector nv2 = (NamedVector) v2;
+      assertEquals(nv.getName(), nv2.getName());
+      assertEquals("Victor", nv.getName());
+    }
+    assertEquals(v, v2);
+  }
+
+  private static void writeAndRead(Writable toWrite, Writable toRead) throws 
IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    try {
+      toWrite.write(dos);
+    } finally {
+      Closeables.close(dos, false);
+    }
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    try {
+      toRead.readFields(dis);
+    } finally {
+      Closeables.close(dos, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index bb7077c..f9c1690 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -68,11 +68,21 @@
     <!-- own modules -->
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-hdfs</artifactId>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>mahout-mrlegacy</artifactId>
+      <artifactId>mahout-mr</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-mr</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java 
b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
index 9c0bc11..a13341b 100644
--- a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
+++ b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
@@ -48,7 +48,7 @@ public final class PrefixAdditionFilter extends 
SequenceFilesFromDirectoryFilter
   protected void process(FileStatus fst, Path current) throws IOException {
     FileSystem fs = getFs();
     ChunkedWriter writer = getWriter();
-    if (fst.isDirectory()) {
+    if (fst.isDir()) {
       String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + 
Path.SEPARATOR + fst.getPath().getName();
       fs.listStatus(fst.getPath(),
                     new PrefixAdditionFilter(getConf(), dirPath, getOptions(), 
writer, getCharset(), fs));

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
 
b/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
index 18c1252..e97e35b 100644
--- 
a/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
+++ 
b/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
@@ -79,7 +79,7 @@ public class ReadOnlyFileSystemDirectory extends 
BaseDirectory {
     try {
       FileStatus status = fs.getFileStatus(directory);
       if (status != null) {
-        isDir = status.isDirectory();
+        isDir = status.isDir();
       }
     } catch (IOException e) {
       log.error(e.getMessage(), e);
@@ -99,7 +99,7 @@ public class ReadOnlyFileSystemDirectory extends 
BaseDirectory {
     try {
       FileStatus status = fs.getFileStatus(directory);
       if (status != null) {
-        isDir = status.isDirectory();
+        isDir = status.isDir();
       }
     } catch (IOException e) {
       log.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java 
b/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
index bf6691f..2dcc8b0 100644
--- a/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
+++ b/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
@@ -62,7 +62,7 @@ public final class SequenceFileDumper extends AbstractJob {
     Configuration conf = new Configuration();
     Path input = getInputPath();
     FileSystem fs = input.getFileSystem(conf);
-    if (fs.getFileStatus(input).isDirectory()) {
+    if (fs.getFileStatus(input).isDir()) {
       pathArr = FileUtil.stat2Paths(fs.listStatus(input, 
PathFilters.logsCRCFilter()));
     } else {
       pathArr = new Path[1];

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java 
b/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
index 834d5cd..af22422 100644
--- a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
+++ b/integration/src/main/java/org/apache/mahout/utils/SplitInput.java
@@ -289,7 +289,7 @@ public class SplitInput extends AbstractJob {
     if (fs.getFileStatus(inputDir) == null) {
       throw new IOException(inputDir + " does not exist");
     }
-    if (!fs.getFileStatus(inputDir).isDirectory()) {
+    if (!fs.getFileStatus(inputDir).isDir()) {
       throw new IOException(inputDir + " is not a directory");
     }
 
@@ -317,7 +317,7 @@ public class SplitInput extends AbstractJob {
     if (fs.getFileStatus(inputFile) == null) {
       throw new IOException(inputFile + " does not exist");
     }
-    if (fs.getFileStatus(inputFile).isDirectory()) {
+    if (fs.getFileStatus(inputFile).isDir()) {
       throw new IOException(inputFile + " is a directory");
     }
 
@@ -650,10 +650,10 @@ public class SplitInput extends AbstractJob {
       Configuration conf = getConf();
       FileSystem fs = trainingOutputDirectory.getFileSystem(conf);
       FileStatus trainingOutputDirStatus = 
fs.getFileStatus(trainingOutputDirectory);
-      Preconditions.checkArgument(trainingOutputDirStatus != null && 
trainingOutputDirStatus.isDirectory(),
+      Preconditions.checkArgument(trainingOutputDirStatus != null && 
trainingOutputDirStatus.isDir(),
           "%s is not a directory", trainingOutputDirectory);
       FileStatus testOutputDirStatus = fs.getFileStatus(testOutputDirectory);
-      Preconditions.checkArgument(testOutputDirStatus != null && 
testOutputDirStatus.isDirectory(),
+      Preconditions.checkArgument(testOutputDirStatus != null && 
testOutputDirStatus.isDir(),
           "%s is not a directory", testOutputDirectory);
     }
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
 
b/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
index 63399b5..d564a73 100644
--- 
a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
+++ 
b/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java
@@ -83,6 +83,7 @@ public class JsonClusterWriter extends AbstractClusterWriter {
     if (dictionary != null) {
       Map<String,Object> fmtStr = cluster.asJson(dictionary);
       res.put("cluster", fmtStr);
+
       // get points
       List<Object> points = getPoints(cluster, dictionary);
       res.put("points", points);

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
----------------------------------------------------------------------
diff --git 
a/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java 
b/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
index 2a8a42b..9214434 100644
--- 
a/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
+++ 
b/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
@@ -97,7 +97,7 @@ public final class VectorDumper extends AbstractJob {
     FileSystem fs = FileSystem.get(conf);
     Path input = getInputPath();
     FileStatus fileStatus = fs.getFileStatus(input);
-    if (fileStatus.isDirectory()) {
+    if (fileStatus.isDir()) {
       pathArr = FileUtil.stat2Paths(fs.listStatus(input, 
PathFilters.logsCRCFilter()));
     } else {
       FileStatus[] inputPaths = fs.globStatus(input);

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/integration/src/test/java/org/apache/mahout/utils/vectors/lucene/LuceneIterableTest.java
----------------------------------------------------------------------
diff --git 
a/integration/src/test/java/org/apache/mahout/utils/vectors/lucene/LuceneIterableTest.java
 
b/integration/src/test/java/org/apache/mahout/utils/vectors/lucene/LuceneIterableTest.java
index 1152936..ba49a2d 100644
--- 
a/integration/src/test/java/org/apache/mahout/utils/vectors/lucene/LuceneIterableTest.java
+++ 
b/integration/src/test/java/org/apache/mahout/utils/vectors/lucene/LuceneIterableTest.java
@@ -117,7 +117,7 @@ public final class LuceneIterableTest extends 
MahoutTestCase {
     LuceneIterable iterable = new LuceneIterable(reader, "id", "content",  
termInfo,weight);
 
     Iterator<Vector> iterator = iterable.iterator();
-    Iterators.skip(iterator, 1);
+    Iterators.advance(iterator, 1);
   }
 
   @Test
@@ -157,10 +157,10 @@ public final class LuceneIterableTest extends 
MahoutTestCase {
     //50 percent tolerance
     iterable = new LuceneIterable(reader, "id", "content", termInfo,weight, 
-1, 0.5);
     Iterator<Vector> iterator = iterable.iterator();
-    Iterators.skip(iterator, 5);
+    Iterators.advance(iterator, 5);
 
     try {
-      Iterators.skip(iterator, Iterators.size(iterator));
+      Iterators.advance(iterator, Iterators.size(iterator));
       exceptionThrown = false;
     }
     catch(IllegalStateException ise) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/pom.xml
----------------------------------------------------------------------
diff --git a/mr/pom.xml b/mr/pom.xml
new file mode 100644
index 0000000..0a48150
--- /dev/null
+++ b/mr/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.mahout</groupId>
+    <artifactId>mahout</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <!-- modules inherit parent's group id and version. -->
+  <artifactId>mahout-mr</artifactId>
+  <name>Mahout Map-Reduce</name>
+  <description>Scalable machine learning libraries</description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+      <resource>
+        <directory>../src/conf</directory>
+        <includes>
+          <include>driver.classes.default.props</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <!-- create test jar so other modules can reuse the core test utility 
classes. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- create core hadoop job jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>job</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/job.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-remote-resources-plugin</artifactId>
+        <configuration>
+          
<appendedResourcesDirectory>../src/main/appended-resources</appendedResourcesDirectory>
+          <resourceBundles>
+            
<resourceBundle>org.apache:apache-jar-resource-bundle:1.4</resourceBundle>
+          </resourceBundles>
+          <supplementalModels>
+            <supplementalModel>supplemental-models.xml</supplementalModel>
+          </supplementalModels>
+        </configuration>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+  
+    <!-- our modules -->
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-math</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-math</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-hdfs</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>mahout-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Third Party -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>11.0.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jcl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mahout.commons</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.carrotsearch.randomizedtesting</groupId>
+      <artifactId>randomizedtesting-runner</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.mrunit</groupId>
+      <artifactId>mrunit</artifactId>
+      <version>1.0.0</version>
+      <classifier>${hadoop.classifier}</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <version>3.0.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-commons-csv</artifactId>
+      <version>3.5.0</version>
+    </dependency>
+
+  </dependencies>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/assembly/job.xml
----------------------------------------------------------------------
diff --git a/mr/src/main/assembly/job.xml b/mr/src/main/assembly/job.xml
new file mode 100644
index 0000000..2bdb3ce
--- /dev/null
+++ b/mr/src/main/assembly/job.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
+    http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+  <id>job</id>
+  <formats>
+   <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <unpack>true</unpack>
+      <unpackOptions>
+        <!-- MAHOUT-1126 -->
+        <excludes>
+          <exclude>META-INF/LICENSE</exclude>
+        </excludes>
+      </unpackOptions>
+      <scope>runtime</scope>
+      <outputDirectory>/</outputDirectory>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-core</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/classes</directory>
+      <outputDirectory>/</outputDirectory>
+      <excludes>
+        <exclude>*.jar</exclude>
+      </excludes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/classes</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>driver.classes.default.props</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/Version.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/Version.java 
b/mr/src/main/java/org/apache/mahout/Version.java
new file mode 100644
index 0000000..5f3c879
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/Version.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+import java.io.IOException;
+
+public final class Version {
+
+  private Version() {
+  }
+
+  public static String version() {
+    return Version.class.getPackage().getImplementationVersion();
+  }
+
+  public static String versionFromResource() throws IOException {
+    return Resources.toString(Resources.getResource("version"), 
Charsets.UTF_8);
+  }
+
+  public static void main(String[] args) throws IOException {
+    System.out.println(version() + ' ' + versionFromResource());
+  }
+}

Reply via email to